数据库连接池
flask 可以使用pymysql来操作数据库
借助于第三方模块,实现数据库连接池
1.安装 pip install dbutils
2.pool.py
'''
from dbutils.pooled_db import PooledDB
import pymysql
POOL=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='luffy_api',
charset='utf8'
)
'''
使用
from flask import Flask
from pool import POOL
app = Flask(__name__)
@app.route('/banner')
def banner():
# 从连接池中拿连接,创建cursor对象
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from db')
print(cursor.fetchall())
return 'hello'
查看当前有多少个连接数
mysql终端:
show status like 'Threads%'
锁
# 并发编程学的锁:
-互斥锁:Lock
-递归锁(可重入锁):RLock,互斥锁会出现死锁现象
-Semaphore:多把锁,信号量
-event事件:某个条件完成成,线程执行
# 所有锁:https://zhuanlan.zhihu.com/p/489305763
信号
django中也有信号,参考https://www.cnblogs.com/liuqingzheng/articles/9803403.html
安装
pip install blinker
概念
在整个flask请求生命周期中,运行到某一个特定的地方时,会自动触发(aop理念,面向切面编程的理念)
内置信号
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
内置信号使用步骤
from flask import Flask, render_template
from flask import signals
app = Flask(__name__)
(1)写一个函数
def render_brfore(*args,**kwargs):
print(args)
print(kwargs)
print('模板要渲染了')
(2)跟信号绑定
signals.before_render_template.connect(render_before)
(3)触发信号
@app.route('/')
def index():
print('index 执行了')
return render_template('index.html',name='春游去动物园')
if __name__ == '__main__':
app.run()
自定义信号
from flask import Flask, render_template,request
from flask import signals
from flask.signals import _signals
app = Flask(__name__)
(1)定义信号
xxx = _signals.signal('xxx')
(2)写个函数
def add(*args,**kwargs):
print(args)
print(kwargs)
print('add执行了')
(3)绑定信号
xxx.connect(add)
(4)触发信号
@app.route('/')
def index():
xxx.send()
print('index 执行了')
return render_template('index.html',name='春游去动物园')
if __name__ == '__main__':
app.run()
flask-script
djagno中执行djagno程序,通过命令:python3 manage.py runserver--->还有其他很多命令
想让flask能够执行python3 manage.py runserver启动flask项目,自定制一些命令,完成更多操作
借助于第三方 flask-script来完成
pip install flask-script
from flask import Flask
from flask_script import Manager
app = Flask(__name__)
# 基本使用
manager = Manager(app) # 使用Manager类将app包裹起来
'''
现在只能使用命令方式运行项目,多了runserver方法
'''
# 高级使用:自定义命令
@manager.command
def dbinit():
'''
python manager = Manager(app)所在的文件名 dbinit
python manager.py dbinit
'''
print('数据库初始化完成')
@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
"""
自定义命令(-n也可以写成--name)
执行: python manage.py cmd -n lqz -u xxx
执行: python manage.py cmd --name lqz --url yyy
"""
print(name, url)
@app.route('/index')
def index():
return "app01"
if __name__ == "__main__":
# app.run()
manager.run() # 需要修改为manager.run()
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/279810.html