前言
主程序为单进程单线程,当遇到了某些比较耗时的操作时,会卡住执行流程,非常影响效率。而引进多进程或多线程,则能在一定程度上缓解这种情况。
多进程
多线程
多线程
import threadpool
THREAD_POOL_SIZE = 4
def execute_thread(func, args_list, pools=4, force_pool=False):
"""
多线程
:param func: 单线程的执行方法
:type func: 方法名
:param args_list: 单线程的参数组成的数组
:type args_list: [[(args1, args2,), {'key1': value1, 'key2': value2}], ]
:param pools: 线程池数量
:type pools: int
:param force_pool: 当pools大于设定的最大限制时,是否强制使用pools
:type force_pool: bool
:return: func返回的结果组成的列表,按args_list的顺序
:rtype:
"""
if pools > THREAD_POOL_SIZE and not force_pool:
pools = THREAD_POOL_SIZE
thread_pool = threadpool.ThreadPool(pools)
result_list = [None] * len(args_list)
# 构造不定参数
def tmp_f(item):
args = item[0] if any([isinstance(item[0], tuple), isinstance(item[0], list)]) else []
kwargs = item[-1] if isinstance(item[-1], dict) else {}
return func(*args, **kwargs)
# 构造回调函数
def callback(req, result):
result_list[task_list.index(req)] = result
task_list = threadpool.makeRequests(tmp_f, args_list, callback)
[thread_pool.putRequest(task) for task in task_list]
# task_pool.poll()
thread_pool.wait()
return result_list
多协程
多协程
import gevent
from gevent.pool import Pool
from gevent import monkey
GEVENT_POOL_SIZE = 4
def execute_event(func, args_list, pools=4, force_pool=False):
"""
多协程
:param func: 单协程的执行方法
:type func: 方法名
:param args_list: 单协程的参数组成的数组
:type args_list: [[(args1, args2,), {'key1': value1, 'key2': value2}], ]
:param pools: 协程池数量
:type pools: int
:param force_pool: 当pools大于设定的最大限制时,是否强制使用pools
:type force_pool: bool
:return: func返回的结果组成的列表,按args_list的顺序
:rtype:
"""
monkey.patch_socket() # 识别IO阻塞
if pools > GEVENT_POOL_SIZE and not force_pool:
pools = GEVENT_POOL_SIZE
gevent_pool = gevent.pool.Pool(pools)
def tmp_f(item):
args = item[0] if any([isinstance(item[0], tuple), isinstance(item[0], list)]) else []
kwargs = item[-1] if isinstance(item[-1], dict) else {}
return func(*args, **kwargs)
task_list = []
for item in args_list:
task_list.append(gevent_pool.spawn(tmp_f, item))
gevent.joinall(task_list)
result_list = []
for task in task_list:
result_list.append(task.value)
return result_list
测试
测试
from urllib import request
import time
from pools import execute_thread
from pools import execute_event
def wget(value):
url, t = value
start = time.time()
print('GET: %s' % url)
# time.sleep(t) # 不属于IO卡顿,可用gevent.sleep(t)替换
resp = request.urlopen(url)
data = resp.read()
end = time.time()
print(f'{len(data)} bytes received from {url}', f'消耗时间:{int(end) - int(start)}')
return url
urls = [
['https://www.baidu.com/', 3],
['https://www.python.org/', 5],
['https://github.com/', 7],
]
# 单线程
now = time.time()
result_list = []
for n in urls:
result = wget(n)
result_list.append(result)
print("单线程抓取使用时间:", time.time() - now)
print('单线程的返回:', result_list)
print('/n')
# 多线程
now_thread = time.time()
task_threads = []
result_threads = execute_thread(wget, [[(url, )] for url in urls])
print("多线程抓取使用时间:", time.time() - now_thread)
print('多线程的返回:', result_threads)
print('/n')
# 协程
now_gevent = time.time()
result_gevent = execute_event(wget, [[(url, )] for url in urls])
print("协程抓取使用时间:", time.time() - now_gevent)
print('协程的返回:', result_gevent)
print('/n')
原创文章,作者:745907710,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/267137.html