python如何连接数据库(SSH方式)
性能测试时,有个支付订单的场景,需要用到已生成的订单code,如何获取订单code?
一,通过Jmeter连接数据库获取。二,直接mysql导出数据
我这里是使用python导出mysql数据,使用的SSH方式:
import pymysql
import csv
import pandas as pd
from sshtunnel import SSHTunnelForwarder # ssh连接库
import base64
orderCode_path = r"D:/t_order.csv" # 保存地址
def mysql_ssh(sql,args=None):
with SSHTunnelForwarder(
('SSH_IP', SSH_端口号),
ssh_password='SSH密码',
ssh_username='SSH用户名',
local_bind_address=('127.0.0.1', 22), # 本地绑定端口
remote_bind_address=('远程数据库地址', 3306)) as server:
print('SSH连接成功')
conn = pymysql.connect(host='127.0.0.1',
port=22,
user='数据库名',
password='数据库登录密码',
database='数据库登录账号',
charset='utf8')
print('mysql数据库连接成功')
cursor = conn.cursor()
print('游标获取成功')
try:
print(f'执行查询语句:{sql} 参数:{args}')
cursor.execute(sql,args)
print('数据查询成功')
conn.commit()
print('事务提交成功')
datas = cursor.fetchall()
success = True
except:
print('数据查询失败')
datas = None
success = False
print('正在关闭数据库连接')
cursor.close()
conn.close()
return datas, success
def execute_sql(sql):
try:
result = []
datas = mysql_ssh(sql)
if datas[-1]:
print(f"共找到{len(datas[0])}个数据")
result = list(datas[0])
try:
datas_write(result)
except Exception as error:
print(error)
else:
result.clear()
else:
print('数据查询失败')
except Exception as e:
print(e)
def datas_write(a):
dataframe = pd.DataFrame(a)
dataframe.to_csv(orderCode_path, mode= 'w', index=False, sep=',',header=["orderCode", "userId"])
print("写入成功!")
if __name__ == "__main__":
sql = r"SELECT order_code,user_id FROM `t_order`;" # 导出所有的订单code
execute_sql(sql)
还遇到一个需要用户token的场景,用户token是存在redis的,于是通过python获取redis的数据,SSH方式如下:
import redis
import pandas as pd
from sshtunnel import SSHTunnelForwarder # ssh连接库
token_path = r"./login_token.csv" # redis数据保存路径
server = SSHTunnelForwarder(
ssh_address_or_host= ('SSH地址',22), # ssh地址
ssh_username= "SSH用户", # ssh连接的用户名
ssh_password= "SSH用户密码" ,
remote_bind_address=('远程redis地址', 6379))
def execute_sql():
try:
r = redis.Redis(host='127.0.0.1', port=server.local_bind_port, decode_responses=True, password='redis密码', db=0, encoding='gb18030') #encoding可以不加
print("连接成功")
keys = r.keys(pattern="*userId*")
print(f"共找到{len(keys)}个数据")
result = []
for i in keys:
try:
result.append(r.get(i))
except Exception as e:
print("{}不存在".format(i))
result = None
except Exception as e:
print(e)
print(f"共保存{len(result)}个数据")
return result
def token_write(a):
dataframe = pd.DataFrame({'token': a})
dataframe.to_csv(token_path, mode= 'a', index=False, sep=',',header=False)
print("写入成功!")
if __name__ == "__main__":
server.start()
token = execute_sql()
token_write(token)
server.close()
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/python/280768.html