Python3:sqlalchemy对mysql数据库操作,非sql语句详解编程语言

Python3:sqlalchemy对mysql数据库操作,非sql语句

# python3 
# author lizm 
# datetime 2018-02-01 10:00:00 
# -*- coding: utf-8 -*- 
''' 
    数据起始日期:2015-05-08 
    数据库:mysql 
''' 
import requests 
from bs4 import BeautifulSoup 
import json 
import pymysql 
import datetime 
import time 
import sys 
import logging 
from selenium import webdriver 
from sqlalchemy import Column,Integer, String,DateTime,create_engine 
from sqlalchemy.orm import sessionmaker 
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy import and_,func 
import configparser 
import math 
 
logger = logging.getLogger() 
#set loghandler 
file = logging.FileHandler(sys.path[0]+"/py_zgjs_log"+time.strftime("%Y%m%d")+".log") 
logger.addHandler(file) 
#set formater 
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") 
file.setFormatter(formatter)  
#set log level 
logger.setLevel(logging.NOTSET) 
 
# 创建对象的基类: 
Base = declarative_base() 
 
class Yztzzqktjb(Base): 
 
    # 表名 
    __tablename__ = 'py_zgjs_yztzzqktjb' 
 
    # 表结构 
    id = Column(Integer,primary_key=True,autoincrement=True) 
    mc = Column(String(200),nullable=False) 
    begindate = Column(String(45),nullable=False) 
    enddate = Column(String(45), nullable=False) 
    sjmc = Column(String(200)) 
    ssjmc = Column(String(200)) 
    sl = Column(String(45)) 
    create_time = Column(DateTime,nullable=False) 
    update_time = Column(DateTime,nullable=False) 
 
    def __init__(self,mc,begindate,enddate,sjmc,ssjmc,sl,create_time,update_time): 
        self.mc = mc 
        self.begindate = begindate 
        self.enddate = enddate 
        self.sjmc = sjmc 
        self.ssjmc = ssjmc 
        self.sl = sl 
        self.create_time = create_time 
        self.update_time = update_time 
 
class ZgjsEntry(object): 
 
    def __init__(self, v1, v2,v3,v4,v5,v6): 
        self.v1 = v1 
        self.v2 = v2 
        self.v3 = v3 
        self.v4 = v4 
        self.v5 = v5 
        self.v6 = v6 
 
    def __get__(self, instance, cls): 
        if instance is None: 
            return self 
        else: 
            return instance.__dict__[self.name] 
 
    def __set__(self, instance, value): 
        instance.__dict__[self.name] = value 
 
    def __delete__(self, instance): 
        del instance.__dict__[self.name] 
 
def dbconfig(): 
    #生成config对象 
    cfg = configparser.ConfigParser() 
    #用config对象读取配置文件 
    path_ = sys.path[0] 
    cfg.read(path_+"/dbconfig.ini") 
    ip = cfg.get("dbserver", "ip") 
    port = cfg.get("dbserver", "port") 
    user = cfg.get("dbserver", "user") 
    password = cfg.get("dbserver", "password") 
    dbname = cfg.get("dbserver", "dbname") 
    endtime = cfg.get("dbtime", "endtime") 
    initdate = cfg.get("dbtime", "mzkbinitdate") 
    interval = cfg.get("dbtime", "interval") 
    return (ip,port,user,password,dbname,endtime,initdate,interval) 
 
def savrData(tableName,zgjsList): 
    msgcode = 0 
    message = '数据保存成功' 
    try: 
        dbcfg = dbconfig() 
        # 初始化数据库连接, 
        # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节 
        engine = create_engine('mysql+mysqlconnector://'+dbcfg[2]+':'+dbcfg[3]+'@'+dbcfg[0]+':'+dbcfg[1]+'/'+dbcfg[4],encoding='utf-8') 
        # 创建DBSession类型: 
        DBSession = sessionmaker(bind=engine) 
        session = DBSession() 
        try: 
            # 增操作 
            items = [] 
            if tableName == 'Yztzzqktjb': 
                if len(zgjsList)>0: 
                    for i in range(0,len(zgjsList)): 
                        results = session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).all() 
                        if len(results) > 0: 
                            session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).update({Yztzzqktjb.sl: zgjsList[i].v6,Yztzzqktjb.update_time:time.strftime('%Y-%m-%d %H:%M:%S')}, synchronize_session=False) 
                        else: 
                            item = Yztzzqktjb(mc=zgjsList[i].v1,begindate=zgjsList[i].v2,enddate=zgjsList[i].v3,sjmc=zgjsList[i].v4,ssjmc=zgjsList[i].v5,sl=zgjsList[i].v6,create_time=time.strftime('%Y-%m-%d %H:%M:%S'),update_time=time.strftime('%Y-%m-%d %H:%M:%S')) 
                            items.append(item) 
            else: 
                pass 
            #print("len(items)>>>>>%s" %len(items)) 
            if len(items) > 0: 
                for i in range(0,len(items)): 
                    session.add(items[i]) 
            #提交数据 
            session.commit() 
        except Exception as e: 
            msgcode = 1 
            message = '数据保存失败' + str(e) 
            session.rollback() 
        finally: 
            #关闭 
            session.close() 
    except Exception as e: 
        msgcode = 1 
        message = '数据库连接失败'+str(e) 
    logger.info(message) 
    print(message) 
    return msgcode 
 
 
def getData(jsDate, channelIdStr,tableName): 
    zgjsList = [] 
    dateStr = jsDate[0:4]+'.'+jsDate[5:7]+'.'+jsDate[8:10] 
    # 查询按钮跳转url: 
    # http://www.******.cn/cms-search/view.action?action=china 
    url = "http://www.******.cn/cms-search/view.action?action=china" 
    headerDict = {'Host': 'www.*******.cn', 
                  'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.31 Safari/537.36', 
                  'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 
                  'Accept-Language': 'zh-CN,zh;q=0.8', 
                  'Accept-Encoding': 'gzip, deflate', 
                  'Referer': 'http://www.******.cn/cms-search/view.action?action=china', 
                  'Connection': 'keep-alive'} 
    data = {'dateType': '', 'dateStr': dateStr, 
            'channelIdStr': channelIdStr}    
    # psot 传递参数 
    res = requests.post(url, data=data, headers=headerDict) 
    # 获取跳转后的页面源码 
    soup = BeautifulSoup(res.content, "html.parser") 
    #获取周报的起始日期 
    SettlementTitle = soup.find('div',class_='SettlementTitle') 
    if SettlementTitle is None: 
        return zgjsList 
    h2 = SettlementTitle.find('h2').text 
    if h2 == '搜索结果': 
        return zgjsList 
    weekdate = h2.strip().split('')[1].split('')[0] 
    begindate = weekdate.split('-')[0].replace('.','-') 
    enddate = weekdate.split('-')[1].replace('.','-') 
 
    settlementList = soup.find(id='settlementList') 
    # print(settlementList) 
    if settlementList is None: 
        return zgjsList 
    if settlementList.find('table') is None: 
        return zgjsList 
     
    table_ = settlementList.find('table') 
    tr_list = table_.find('table').find_all('tr') 
    # 上级名称 
    sjmc_1 = '' 
    sjmc_2 = '' 
    sjmc_3 = '' 
    sjmc_4 = '' 
    sjmc_5 = '' 
    sjmc_6 = '' 
    # 上上级名称 
    ssjmc_1 = '' 
    for n in range(1,len(tr_list)): 
        td_list = tr_list[n].find_all('td') 
        if tableName == 'Yztzzqktjb': 
            if n == 1: 
                sjmc_1 = td_list[0].get_text().replace('一、','').strip() 
            if n == 4: 
                sjmc_2 = td_list[0].get_text().replace('二、','').strip() 
                ssjmc_1 = td_list[0].get_text().replace('二、','').strip() 
            if n == 5: 
                sjmc_3 = td_list[0].get_text().replace('1、','').strip() 
            if n == 9: 
                sjmc_4 = td_list[0].get_text().replace('2、','').strip() 
            if n == 13: 
                sjmc_5 = td_list[0].get_text().replace('三、','').strip() 
            if n == 17: 
                sjmc_6 = td_list[0].get_text().replace('四、','').strip() 
 
        if tableName == 'Yztzzqktjb': 
            if n in (6,10,14,18): 
                continue 
        zgjs = ZgjsEntry('','','','','','') 
        zgjs.v2 = begindate 
        zgjs.v3 = enddate 
 
        if tableName == 'Yztzzqktjb': 
            # 上级名称 
            if n in (2,3): 
                zgjs.v4 = sjmc_1 
            if n in (5,9): 
                zgjs.v4 = sjmc_2 
            if n in (7,8): 
                zgjs.v4 = sjmc_3 
                zgjs.v5 = ssjmc_1 
            if n in (11,12): 
                zgjs.v4 = sjmc_4 
                zgjs.v5 = ssjmc_1 
            if n in (15,16): 
                zgjs.v4 = sjmc_5 
            if n in (19,20): 
                zgjs.v4 = sjmc_6 
        for i in range(0,len(td_list)): 
            if i == 0: 
                zgjs.v1 =td_list[i].get_text().replace('一、','').replace('二、','').replace('三、','').replace('四、','').replace('1、','').replace('2、','').strip() 
            if i == 1: 
                zgjs.v6 =td_list[i].get_text().strip().replace(',','') 
        if zgjs is not None: 
            zgjsList.append(zgjs) 
    return zgjsList 
 
# 获取开始日期: 
def getBeginDate(bgdate,tableName): 
    r_date = bgdate 
    try: 
        dbcfg = dbconfig() 
        # 初始化数据库连接, 
        # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节 
        engine = create_engine('mysql+mysqlconnector://'+dbcfg[2]+':'+dbcfg[3]+'@'+dbcfg[0]+':'+dbcfg[1]+'/'+dbcfg[4],encoding='utf-8') 
        # 创建DBSession类型: 
        DBSession = sessionmaker(bind=engine) 
        session = DBSession() 
        try: 
            if tableName == 'Yztzzqktjb': 
                results = session.query(func.max(Yztzzqktjb.enddate)).all() 
                if len(results) != 0: 
                    r_date = results[0] 
            else: 
                pass 
        except Exception as e: 
            print('获取开始日期,查询异常;%s'%str(e)) 
            logger('获取开始日期,查询异常;%s'%str(e)) 
            session.rollback() 
        finally: 
            #关闭 
            session.close() 
    except Exception as e: 
        print('获取开始日期,数据库连接失败;%s'%str(e)) 
        logger('获取开始日期,数据库连接失败;%s'%str(e)) 
    if r_date[0] is None: 
        r_date = bgdate 
    else: 
        begin = time.strptime(r_date[0], "%Y-%m-%d") 
        y,m,d = begin[0:3] 
        r_date =  datetime.date(y,m,d) + datetime.timedelta(days=7) 
        r_date = r_date .strftime('%Y-%m-%d') 
    return r_date 
 
def isCheckData(date_): 
    r_code = 0 
    try: 
        dbcfg = dbconfig() 
        # 初始化数据库连接, 
        # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节 
        engine = create_engine('mysql+mysqlconnector://'+dbcfg[2]+':'+dbcfg[3]+'@'+dbcfg[0]+':'+dbcfg[1]+'/'+dbcfg[4],encoding='utf-8') 
        # 创建DBSession类型: 
        DBSession = sessionmaker(bind=engine) 
        session = DBSession() 
        try: 
            # 
            results = session.query(Yztzzqktjb).filter(and_(func.datediff(Yztzzqktjb.enddate,date_)<6,func.datediff(Yztzzqktjb.enddate,date_)>-2)).all() 
            if len(results) == 0: 
                r_code = 1 
            else: 
                r_code = 0 
        except Exception as e: 
            r_code = 1 
            print('判断是否有数据异常;%s'%str(e)) 
            logger('判断是否有数据异常;%s'%str(e)) 
            session.rollback() 
        finally: 
            #关闭 
            session.close() 
    except Exception as e: 
        r_code = 1 
        print('判断是否有数据,数据库连接异常;%s'%str(e)) 
        logger('判断是否有数据,数据库连接异常;%s'%str(e)) 
    return r_code 
 
# 执行入口 
def main(initdate_): 
    req_list = [ 
    {'report':'6ac54ce22db4474abc234d6edbe53ae7','table':'Yztzzqktjb'} 
    ] 
    for req in req_list: 
        #字符转日期 
        begin = time.strptime(getBeginDate(initdate_,req['table']), "%Y-%m-%d") 
        y,m,d = begin[0:3] 
        #日期格式:2018-01-18 
        begin = datetime.date(y,m,d) 
        #获取当前日期 
        end = datetime.date.today() 
        if (end- begin).days < 0: 
            pass 
        else: 
            for i in range(math.ceil((end - begin).days/7)+1): 
                list_szzj = [] 
                # 日期转字符 
                date_ = (begin+datetime.timedelta(days=i*7)).strftime('%Y-%m-%d') 
                list_mzkb = getData(date_,req['report'],req['table']) 
                if len(list_mzkb): 
                    savrData(req['table'],list_mzkb) 
                else: 
                    pass 
                time.sleep(0.5) 
                if i % 350 == 0: 
                    time.sleep(15) 
 
 
if __name__ == '__main__': 
    vrg_date = '20150509' 
    dbcfg = dbconfig() 
    vrg_endtime = dbcfg[5][0:2]+":"+dbcfg[5][2:4]+":"+dbcfg[5][4:6] 
    var_initdate = dbcfg[6][0:4]+"-"+dbcfg[6][4:6]+"-"+dbcfg[6][6:8] 
    var_interval = int(dbcfg[7]) 
     
    if len(vrg_date) ==8: 
        vrg_date = str(vrg_date[0:4]) + "-" + str(vrg_date[4:6]) + "-" + str(vrg_date[6:8]) 
        end_time = time.strptime(vrg_endtime, "%H:%M:%S") 
        y,m,d = end_time[3:6] 
        end_time = datetime.time(y,m,d) 
        # 循环采集 
        while True:  
            now_time = time.strftime("%H%M%S") 
            main(var_initdate) 
            if isCheckData(vrg_date,) == 0: 
                logger.info("采集数据结束") 
                print("采集数据结束") 
                break 
            # 时间到停止采集 
            if int(end_time.strftime('%H%M%S')) - int(now_time) <= 0: 
                logger.info("采集数据结束") 
                print("采集数据结束") 
                break 
            # 间隔执行时间 
            logger.info("**********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date) 
            print("********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date) 
            time.sleep(var_interval) 
        else: 
            logger.info("日期参数格式不正确,请用格式:20180205") 
            print("日期参数格式不正确,请用格式:20180205") 
        

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/16707.html

(0)
上一篇 2021年7月19日 19:31
下一篇 2021年7月19日 19:31

相关推荐

发表回复

登录后才能评论