效果图
目录结构说明:以下这些目录必须存在。
效果说明:
导出指定表的数据到指定目录,并输出两个目录下的同一文件里的数据以供比较。
yaml配置文件说明:
html页面代码:
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>json串转换</title>
</head>
<div>
所有的表:{{msg3}}<br><br>
<form action="/" method="POST" name="form1" action="tijiao.asp" onSubmit="return checkform(this)">
表名:<input type="text" placeholder="表名" name="test_table_name" />
<br></br>
选项:</br>
<input type="radio" name="choice" value="pre">导出交易/清算前的数据【test目录】</br>
<input type="radio" name="choice" value="fin">导出交易/清算后的数据【test1目录】</br>
<input type="radio" name="choice" value="com">对比数据</br>
<br></br>
<input type="submit" value="提交" id="input2"/>
</form>
</div>
<div class="contant">
<p class="top-title">对比的数据</p>
</div>
<div id="content">
{{msg}}
{{msg2}}
</div>
<style type="text/css">
#input1{
height: 20px;
width: 1600px;
border-radius:12px ;
}
#input2{
height: 30px;
width: 700px;
border-radius:12px ;
}
</style>
</body>
<script type="text/javascript">
function formatStr(msg) {
var mainStr = "";
var remark="格式错误,请重新输入";
mainStr += '<table border="1" cellspacing="0" bordercolor="#ADADAD">';
msg=msg.split("|");
for (var i=0; i<msg.length; i++){
line_arr = msg[i].split(",")
mainStr += "<tr>";
for(var j=0; j<line_arr.length; j++){
mainStr += "<th>" + line_arr[j] + "</th>";
}
mainStr += "<tr>";
}
//msg.forEach(e => {
//mainStr += "<td>" + `${e}`.replace('[','').replace(']','').replace('/'','').replace('/'','') + "</td>"
//})
//mainStr += "</tr>";
return mainStr;
}
document.getElementById('content').innerHTML = formatStr(document.getElementById('content').innerHTML);
</script>
</html>
python代码:
# --*-- coding:utf8 --*--
import pymysql, xlwt,xlrd
import pandas as pd
import openpyxl
import csv
import json
import requests
from flask_script import Manager
from flask import Flask, request, jsonify, render_template
import yaml
import os
import shutil
import datetime
def load_config():
f = open('conf2.yaml', 'r', encoding='utf-8')
cfg = f.read()
conf = yaml.load(cfg,Loader=yaml.FullLoader) # 读取配置文件
return conf
def backup(back_path):
#file_path_and_name = os.path.abspath(__file__)#获取执行程序所在目录及程序名
file_full_path = os.path.dirname(os.path.abspath(__file__))#获取程序所在目录
print('file_full_path',file_full_path)
bc_serial_no = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')
tar_path=file_full_path+'//backup//'+back_path+'//test_backup_'+bc_serial_no #自定义文件夹目录
os.mkdir(tar_path)#创建文件夹
primary_dir=str(file_full_path)+'//test'
# 遍历primary_dir下所有文件,包括目录
files=os.listdir(primary_dir)
# print("遍历的结果:",files)
for i in files:
primary_dir=str(primary_dir)
tar_path=str(tar_path)
#print("目标目录222",tar_path)
old=os.path.join(primary_dir,i)#源文件夹字符串拼接
new=os.path.join(tar_path,i)#目标文件夹字符串拼接
# print('源目录是:', old)
# print('备份目录是:', new)
if os.path.exists(old):#如果文件不存在,存在了就不拷贝了
print("----文件备份----")
shutil.copyfile(old,new)
def html_data_format(data):
res_str = ""
if data=='交易/清算前数据导出完成':
res_str=data
elif data=='交易/清算后数据导出完成':
res_str=data
elif data=='请输入表名':
res_str = data
elif data=='test目录下不存在该表':
res_str = data
elif data == 'test1目录下不存在该表':
res_str = data
elif data=='请做出选择':
res_str = data
else:
if len(data) > 0:
for res_item in data:
res_str += ','.join(["%s" % i for i in res_item])
res_str += "|"
return res_str
def export_excel(table_name,sql,path,cfg):
# 连接数据库,查询数据
host, user, passwd, db = cfg['host'], cfg['user'], cfg['passwd'], cfg['db']
conn = pymysql.connect(user=user, host=host, port=33061, passwd=passwd, db=db, charset='utf8')
cur = conn.cursor()
# sql = 'select * from %s' % table_name
cur.execute(sql) # 返回受影响的行数
fields = [field[0] for field in cur.description] # 获取所有字段名
all_data = cur.fetchall() # 所有数据
# 写入excel
book = xlwt.Workbook()
sheet = book.add_sheet('sheet1')
for col, field in enumerate(fields):
sheet.write(0, col, field)
row = 1
for sql_data in all_data:
for col, field in enumerate(sql_data):
sheet.write(row, col, field)
row += 1
#book.save("C:/%s/%s.xls" % (path,table_name))
book.save("./%s/%s.xls" % (path, table_name))
app = Flask(__name__)
@app.route("/", methods=["GET", "POST"])
def get_sum():
table = cfg['table_data']
show=[]
for table_name, sql in table.items():
#print(table_name, sql)
show.append(table_name)
print("show is:",show)
if request.method == "POST":
choice = ''
test_table_name=''
for k, v in request.form.to_dict().items():
if k == 'choice':
choice = str(v)
elif k=='test_table_name':
test_table_name=str(v)
if choice == 'pre':
path = 'test'
backup(path)
for table_name, sql in table.items():
export_excel(table_name, sql, path,cfg)
data = '交易/清算前数据导出完成'
data_new = '!'
elif choice == 'fin':
path = 'test1'
backup(path)
for table_name, sql in table.items():
export_excel(table_name, sql, path,cfg)
data='交易/清算后数据导出完成'
data_new='!'
elif choice == '':
data = '请做出选择'
data_new = '!'
elif choice == 'com':
if test_table_name=='':
data='请输入表名'
data_new='!'
else:
k1 = test_table_name+'.xls'
#k_path='C://test//'+k1
k_path = './/test//' + k1
k_path2 = './/test1//' + k1
flag=os.path.lexists(k_path)
flag2 = os.path.lexists(k_path2)
if flag==False:
data='test目录下不存在该表'
data_new='!'
elif flag2==False:
data = 'test1目录下不存在该表'
data_new = '!'
else:
data = []
data_new = []
old_data = xlrd.open_workbook(os.path.join('.//test', k1))
new_data = xlrd.open_workbook(os.path.join('.//test1', k1))
table = old_data.sheets()[0] # 选取要读的sheet表单
nrows = table.nrows
table2 = new_data.sheets()[0] # 选取要读的sheet表单
nrows2 = table2.nrows
for i in range(nrows):
data.append(table.row_values(i))
for i in range(nrows2):
data_new.append(table2.row_values(i))
html_data = html_data_format(data)
html_data2 = html_data_format(data_new)
#html_data3 = html_data_format(show)
return render_template("test_data.html", error_code='0', msg=html_data, msg2=html_data2,msg3=show)
elif request.method == "GET":
return render_template("test_data.html")
return render_template("test_data.html")
if __name__ == '__main__':
cfg = load_config() # 读取配置文件
manager = Manager((cfg['web_ip'], cfg['web_port']), app)
app.config["JSON_AS_ASCII"] = False
app.run(host=cfg['web_ip'], port=cfg['web_port'])
manager.run()
原创文章,作者:bd101bd101,如若转载,请注明出处:https://blog.ytso.com/277955.html