【并发编程】第3回 线程与协程


目录

1. 验证GIL的存在

1.1 验证GIL是否存在

  1. 同一个进程下的多个线程无法同时执行,单进程下的多线程无法利用多核优势,效率低.
from threading import Thread

money = 100

def task():
    global money
    money -= 1

t_list = []
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)  # [线程1,线程2, 线程3 ...]
for t in t_list:
    t.join()

# 等待所有的线程运行结束 查看money是多少
print(money)

【并发编程】第3回 线程与协程

from threading import Thread
import time

money = 100


def task():
    global money
    tmp = money
    time.sleep(0.1)
    money = tmp - 1


t_list = []
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)  # [线程1,线程2, 线程3 ...]
for t in t_list:
    t.join()

# 等待所有的线程运行结束 查看money是多少
print(money)

【并发编程】第3回 线程与协程

1.2 针对不同的数据应该加不同的锁处理

'''GIL不会影响程序层面的数据也不会保证它的修改是安全的要想保证得自己加锁'''
from threading import Thread,Lock
import time

money = 100
mutex = Lock()

def task():
    mutex.acquire()
    global money
    tmp = money
    time.sleep(0.1)
    money = tmp - 1
    mutex.release()



t_list = []
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)  # [线程1 线程2 线程3 ... 线程100]
for t in t_list:
    t.join()
# 等待所有的线程运行结束 查看money是多少
print(money)

2. 验证python多线程是否有用

2.1 单个CPU IO密集型(代码有IO操作)

  1. 多进程:申请额外的空间,消耗更多的资源
  2. 多线程:消耗资源相对较少,通过多道技术
  3. 总结:多线程有优势

2.2 单个CPU 计算机密集(代码没用IO)

  1. 多进程:申请额外的空间,消耗更多的资源(总耗时+申请空间+拷贝代码+切换)
  2. 多线程:消耗资源相对较少,通过多道技术(总耗时+切换)
  3. 总结:多线程有优势

2.3 多个CPU IO密集型(代码有IO操作)

  1. 多进程:总耗时(单个进程的耗时+IO+申请空间+拷贝代码)
  2. 多线程:总耗时(单个进程的耗时+IO)
  3. 总结:多线程有优势

2.4 多个CPU 计算机密集(代码没有IO)

  1. 多进程:总耗时(单个进程的耗时)
  2. 多线程:总耗时(多个进程的综合)
  3. 总结:多进程完胜

2.5 代码案例

  1. 计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time


def work():
    # 计算密集型
    res = 1
    for i in range(1, 100000):
        res *= i


if __name__ == '__main__':
    print(os.cpu_count())  # 12  查看当前计算机CPU个数
    start_time = time.time()
    # p_list = []
    # for i in range(12):  # 一次性创建12个进程
    #     p = Process(target=work)
    #     p.start()
    #     p_list.append(p)
    # for p in p_list:  # 确保所有的进程全部运行完毕
    #     p.join()
    t_list = []
    for i in range(12):
        t = Thread(target=work)
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print('总耗时:%s' % (time.time() - start_time))  # 获取总的耗时

【并发编程】第3回 线程与协程
【并发编程】第3回 线程与协程
2. IO密集型

def work():
    time.sleep(2)   # 模拟纯IO操作


if __name__ == '__main__':
    start_time = time.time()
    # t_list = []
    # for i in range(100):
    #     t = Thread(target=work)
    #     t.start()
    # for t in t_list:
    #     t.join()
    p_list = []
    for i in range(100):
        p = Process(target=work)
        p.start()
    for p in p_list:
        p.join()
    print('总耗时:%s' % (time.time() - start_time))

【并发编程】第3回 线程与协程
【并发编程】第3回 线程与协程

3. 死锁现象

3.1 定义

  1. 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

3.2 代码案例

from threading import Thread, Lock
import time

mutexA = Lock()  # 类名加括号每执行一次就会产生一个新的对象
mutexB = Lock()  # 类名加括号每执行一次就会产生一个新的对象


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        mutexB.release()
        print(f'{self.name}释放了B锁')
        mutexA.release()
        print(f'{self.name}释放了A锁')

    def func2(self):
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        time.sleep(1)
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexA.release()
        print(f'{self.name}释放了A锁')
        mutexB.release()
        print(f'{self.name}释放了B锁')


for i in range(10):
    t = MyThread()
    t.start()

【并发编程】第3回 线程与协程

4. 信号量

4.1 本质

  1. 信号量的本质也是互斥锁,只不过它是多把锁

4.2 信号量在不同的知识体系中,意思可能有区别

  1. 在并发编程中,信号量就是多把互斥锁
  2. 在diango中,信号量指的是达到某个条件自动触发(中间件)

4.3 对比

  1. Lock产生的是单把锁,类似于单间厕所
  2. 信号量相当于一次性创建多间厕所,类似于公共厕所

4.4 代码案例

from threading import Thread, Lock, Semaphore
import time
import random

sp = Semaphore(5)  # 一次性产生五把锁


class MyThread(Thread):
    def run(self):
        sp.acquire()
        print(self.name)
        time.sleep(random.randint(1, 3))
        sp.release()


for i in range(20):
    t = MyThread()
    t.start()

【并发编程】第3回 线程与协程

5. event事件

5.1 定义

  1. 子进程/子线程(程序)之间可以彼此等待彼此
  2. eg:子 A运行到某一个代码位置后发信号告诉子B开始运行

5.2 代码案例

from threading import Thread, Event
import time

event = Event()  # 类似于造了一个红绿灯


def light():
    print('红灯亮着的 所有人都不能动')
    time.sleep(3)
    print('绿灯亮了 油门踩到底 给我冲!!!')
    event.set()


def car(name):
    print('%s正在等红灯' % name)
    event.wait()
    print('%s加油门 飙车了' % name)


t = Thread(target=light)
t.start()
for i in range(5):
    t = Thread(target=car, args=('熊猫PRO%s' % i,))
    t.start()

【并发编程】第3回 线程与协程

6. 进程池与线程池

6.1 判断

  1. 多进程,多线程在实际应用中是不是可以无限制的开进程和线程
    肯定不可以,会造成内存溢出受限于硬件水平,我们在开设多进程或者多线程的时候,还需要考虑硬件的承受范围

6.2 定义

  1. 池:降低程序的执行效率,保证计算机硬件的安全
  2. 进程池:提前创建好固定个数的进程供程序使用,后续不会再创建
  3. 线程池:提前创建好固定个数的线程供程序使用,后续不会再创建

6.3 线程池,传参submit(函数名,实参1,实参2..)也可以不传

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread

import time

pool = ThreadPoolExecutor(5)  # 固定产生五个线程

def task(n):
    print(current_thread().name)
    print(n)
    time.sleep(1)

for i in range(10):
    pool.submit(task,123)  # 朝池子中提交任务(异步)

【并发编程】第3回 线程与协程

6.4 同步操作,不能直接获取结果

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread

import time

pool = ThreadPoolExecutor(5)  # 固定产生五个线程

def task(n):
    print(current_thread().name)
    # print(n)
    time.sleep(1)

for i in range(10):
    res = pool.submit(task,123)  # 朝池子中提交任务(异步)
    print(res.result())  # 同步操作

【并发编程】第3回 线程与协程

6.5 异步回调:异步任务执行完成后有结果就会自动触发该机制

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread

import time

pool = ThreadPoolExecutor(5)  # 固定产生五个线程


def task(n):
    print(current_thread().name)
    # print(n)
    time.sleep(1)
    return '返回的结果'


def func(*args, **kwargs):
    print('func', args, kwargs)
    print(args[0].result())


for i in range(10):
    # res = pool.submit(task,123)  # 朝池子中提交任务(异步)
    # print(res.result())  # 同步操作
    pool.submit(task, 123).add_done_callback(func)
    """异步回调:异步任务执行完成后有结果就会自动触发该机制"""

【并发编程】第3回 线程与协程
【并发编程】第3回 线程与协程
【并发编程】第3回 线程与协程

6.6 进程池

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread
import os
import time

# pool = ThreadPoolExecutor(5)  # 固定产生五个线程
pool = ProcessPoolExecutor(5)  # 固定产生五个进程

def task(n):
    # print(current_thread().name)
    print(os.getpid())  # 进程号
    # print(n)
    time.sleep(1)
    return '返回的结果'


def func(*args, **kwargs):
    print('func', args, kwargs)
    print(args[0].result())


if __name__ == '__main__':
    for i in range(20):
        # res = pool.submit(task,123)  # 朝池子中提交任务(异步)
        # print(res.result())  # 同步操作
        pool.submit(task, 123).add_done_callback(func)
        """异步回调:异步任务执行完成后有结果就会自动触发该机制"""

【并发编程】第3回 线程与协程

7. 协程

7.1 本质

  1. 进程:资源单位
  2. 线程:执行单位
  3. 协程:单线程下实现并发(效率极高)
    在代码层面欺骗CPU,让CPU觉得我们的代码里面没有IO操作,实际上IO操作被我们自己写的代码检测,一旦有,立刻让代码执行别的(该技术完全是程序员自己弄出来的,名字也是程序员自己起的,核心:自己写代码完成切换+保存状态)

7.2 代码案例

import time
from gevent import monkey;monkey.patch_all()  # 固定编写,用于检测所有的IO操作(猴子补丁)
from gevent import spawn


def func1():
    print('func1 running')
    time.sleep(3)
    print('func1 over')


def func2():
    print('func2 running')
    time.sleep(5)
    print('func2 over')


if __name__ == '__main__':
    start_time = time.time()
    # func1()
    # func2()
    s1 = spawn(func1)  # 检测代码 一旦有IO自动切换(执行没有IO的操作,变相的等待IO结束)
    s2 = spawn(func2)
    s1.join()
    s2.join()
    print(time.time() - start_time)  # 8.077142477035522  # 5.078031539916992

【并发编程】第3回 线程与协程
【并发编程】第3回 线程与协程

7.3 协程实现TCP服务端并发

  1. 服务端代码
import socket
from gevent import monkey;monkey.patch_all()
from gevent import spawn


def communication(sock):
    while True:
        data = sock.recv(1024)
        print(data.decode('utf8'))
        sock.send(data.upper())


def get_server():
    server = socket.socket()
    server.bind(('127.0.0.1', 8080))
    server.listen(5)
    while True:
        sock, addr = server.accept()  # IO操作
        spawn(communication, sock)

s1 = spawn(get_server)
s1.join()
  1. 客户段代码
import socket
from threading import Thread,current_thread


def get_client():
    client = socket.socket()
    client.connect(('127.0.0.1', 8080))
    while True:
        client.send(f'hello {current_thread().name}'.encode('utf8'))
        data = client.recv(1024)
        print(data.decode('utf8'))


for i in range(400):
    t = Thread(target=get_client())
    t.start()

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/279999.html

(0)
上一篇 2022年8月11日
下一篇 2022年8月11日

相关推荐

发表回复

登录后才能评论