进程与线程


前言

主程序为单进程单线程,当遇到了某些比较耗时的操作时,会卡住执行流程,非常影响效率。而引进多进程或多线程,则能在一定程度上缓解这种情况。

多进程

多线程

多线程
import threadpool

THREAD_POOL_SIZE = 4


def execute_thread(func, args_list, pools=4, force_pool=False):
    """
    多线程
    :param func: 单线程的执行方法
    :type func: 方法名
    :param args_list: 单线程的参数组成的数组
    :type args_list: [[(args1, args2,), {'key1': value1, 'key2': value2}], ]
    :param pools: 线程池数量
    :type pools: int
    :param force_pool: 当pools大于设定的最大限制时,是否强制使用pools
    :type force_pool: bool
    :return: func返回的结果组成的列表,按args_list的顺序
    :rtype:
    """
    if pools > THREAD_POOL_SIZE and not force_pool:
        pools = THREAD_POOL_SIZE
    thread_pool = threadpool.ThreadPool(pools)
    result_list = [None] * len(args_list)

    # 构造不定参数
    def tmp_f(item):
        args = item[0] if any([isinstance(item[0], tuple), isinstance(item[0], list)]) else []
        kwargs = item[-1] if isinstance(item[-1], dict) else {}
        return func(*args, **kwargs)

    # 构造回调函数
    def callback(req, result):
        result_list[task_list.index(req)] = result

    task_list = threadpool.makeRequests(tmp_f, args_list, callback)
    [thread_pool.putRequest(task) for task in task_list]
    # task_pool.poll()
    thread_pool.wait()
    return result_list

多协程

多协程
import gevent
from gevent.pool import Pool
from gevent import monkey

GEVENT_POOL_SIZE = 4


def execute_event(func, args_list, pools=4, force_pool=False):
    """
    多协程
    :param func: 单协程的执行方法
    :type func: 方法名
    :param args_list: 单协程的参数组成的数组
    :type args_list: [[(args1, args2,), {'key1': value1, 'key2': value2}], ]
    :param pools: 协程池数量
    :type pools: int
    :param force_pool: 当pools大于设定的最大限制时,是否强制使用pools
    :type force_pool: bool
    :return: func返回的结果组成的列表,按args_list的顺序
    :rtype:
    """
    monkey.patch_socket()       # 识别IO阻塞

    if pools > GEVENT_POOL_SIZE and not force_pool:
        pools = GEVENT_POOL_SIZE
    gevent_pool = gevent.pool.Pool(pools)

    def tmp_f(item):
        args = item[0] if any([isinstance(item[0], tuple), isinstance(item[0], list)]) else []
        kwargs = item[-1] if isinstance(item[-1], dict) else {}
        return func(*args, **kwargs)
    task_list = []
    for item in args_list:
        task_list.append(gevent_pool.spawn(tmp_f, item))
    gevent.joinall(task_list)

    result_list = []
    for task in task_list:
        result_list.append(task.value)
    return result_list

测试

测试
from urllib import request
import time

from pools import execute_thread
from pools import execute_event


def wget(value):
    url, t = value
    start = time.time()
    print('GET: %s' % url)
    # time.sleep(t)               # 不属于IO卡顿,可用gevent.sleep(t)替换
    resp = request.urlopen(url)
    data = resp.read()
    end = time.time()
    print(f'{len(data)} bytes received from {url}', f'消耗时间:{int(end) - int(start)}')
    return url


urls = [
    ['https://www.baidu.com/', 3],
    ['https://www.python.org/', 5],
    ['https://github.com/', 7],
]

# 单线程
now = time.time()
result_list = []
for n in urls:
    result = wget(n)
    result_list.append(result)
print("单线程抓取使用时间:", time.time() - now)
print('单线程的返回:', result_list)
print('/n')

# 多线程
now_thread = time.time()
task_threads = []
result_threads = execute_thread(wget, [[(url, )] for url in urls])
print("多线程抓取使用时间:", time.time() - now_thread)
print('多线程的返回:', result_threads)
print('/n')

# 协程
now_gevent = time.time()
result_gevent = execute_event(wget, [[(url, )] for url in urls])
print("协程抓取使用时间:", time.time() - now_gevent)
print('协程的返回:', result_gevent)
print('/n')

原创文章,作者:745907710,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/267137.html

(0)
上一篇 2022年6月14日 18:49
下一篇 2022年6月14日 18:49

相关推荐

发表回复

登录后才能评论