十二、多任务
12.1 进程
'''
并发和并行:
并发:当有多个线程在操作时,系统只有一个CPU,并不能真正实现多任务同时进行,它是把CPU划分成若干个时间段,任务顺序执行
并行:当系统有2个及以上CPU,则线程操作有可能并发,当一个CPU 执行一个线程,另一个CPU执行另一个线程,互不抢占资源
实现多任务的方式:
多进程模式 : 进程是程序的实体,对操作系统来说,一个任务就是一个进程。如打开2个记事本就启动2个进程
优点:稳定性高,一个进程崩溃了,不会影响其他进程
缺点:创建进程开销大,操作系统同时运行进程的数量有限
进程创建: linux下使用os模块fork函数来创建,windows下使用 multiprocessing模块中的process类来创建
from multiprocessing import Process
process = Process(target=函数, name= 进程的名字,args=(给函数传递的参数))
process 对象
对象调用方法:
process.start() 启动进程并执行任务
process.run() 只启动了进程,没有执行任务
terminate() 终止
多线程模式
协程
进程 --> 线程 --> 协程
'''
#进程创建
import os
from multiprocessing import Process
from time import sleep
def task1():
while True:
sleep(1)
print('This is task1', os.getpid(),'----->',os.getppid())
def task2():
while True:
sleep(1)
print('This is task2', os.getpid(),'----->',os.getppid())
if __name__ == '__main__':
print(os.getpid())
#子进程1
p = Process(target=task1,name='任务1')
p.start()
print(p.name)
#子进程2
p1 = Process(target=task2,name='任务2')
p1.start()
print(p1.name)
带参数
#进程创建
import os
from multiprocessing import Process
from time import sleep
def task1(s,name):
while True:
sleep(s)
print('This is task1', os.getpid(),'----->',os.getppid(),name)
def task2(s,name):
while True:
sleep(s)
print('This is task2', os.getpid(),'----->',os.getppid(),name)
number = 1
if __name__ == '__main__':
print(os.getpid())
p = Process(target=task1,name='任务1',args=(1,'测试任务1'))
p.start()
print(p.name)
p1 = Process(target=task2,name='任务2',args=(1,'测试任务2'))
p1.start()
print(p1.name)
while True:
number += 1
sleep(0.2)
if number == 100:
p.terminate()
p1.terminate()
break
else:
print(number)
print('************')
12.2 多进程对全局变量的访问
'''
多进程对全局变量的访问:在每一个全局变量里都放一个m变量,保证每个进程访问变量互不干扰
每个进程都独立获得一份全局变量,互不影响各自的修改
'''
#进程创建
import os
from multiprocessing import Process
from time import sleep
m= 1 #不可变类型
list1= [] #可变类型
def task1(s,name):
global m
while True:
m+=1
list1.append(str(m)+'task1')
sleep(s)
print('This is task1---->',list1) #This is task1----> ['2task1', '3task1', '4task1', '5task1', '6task1', '7task1']
def task2(s,name):
global m
while True:
m+=1
list1.append(str(m)+'task2') #This is task2----> ['2task2', '3task2', '4task2', '5task2', '6task2', '7task2']
sleep(s)
print('This is task2---->',list1)
number = 1
if __name__ == '__main__':
print(os.getpid())
p = Process(target=task1,name='任务1',args=(1,'测试任务1'))
p.start()
p1 = Process(target=task2,name='任务2',args=(1,'测试任务2'))
p1.start()
while True:
m += 1
list1.append((str(m)+'main'))
sleep(0.5)
print('This is main:---->', list1) #This is main:----> ['2main', '3main', '4main', '5main', '6main', '7main', '8main', '9main', '10main', '11main', '12main', '13main', '14main']
12.3 自定义进程
#进程:自定义
import os
from multiprocessing import Process
from time import sleep
class MyProcess(Process):
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name
#重新run方法
def run(self):
n = 1
while True:
print(f'{self.name}---> 自定义进程,n:{n}')
n += 1
if __name__ == '__main__':
p = MyProcess('测试任务1')
p.start() # 1, 先开一个新的进程,2,调用run()方法
p1 = MyProcess('测试任务2')
p1.start()
12.4 进程池的非阻塞/阻塞模式
非阻塞
'''
使用multiprocessing模块提供的pool方法:
初始化pool时可以指定一个最大进程数,当有新的请求提交到pool中时,如果池子还没满,就会创建一个新的进程来执行该请求
如果pool中进程数已达到指定的最大值,那么该请求就会等待,知道pool中有进程结束,才会创建新的进程来执行
非阻塞式: 全部添加到队列,立刻返回,并没有等待其它进程执行完毕才会结束。但是回调函数,等待任务完成后才去调用
阻塞式:
'''
import os
import time
from multiprocessing import Pool
from random import random
from time import sleep
#非阻塞
def task(task_name):
print('开始做任务:',task_name)
start = time.time()
#使用sleep
time.sleep(random()*2) #2s以内
end = time.time()
# print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid())
return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()
container = []
def callback_func(n):
container.append(n)
if __name__ == '__main__':
pool = Pool(5)
tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步']
for k in tasks:
pool.apply_async(task,args=(k,),callback=callback_func)
pool.close() # 添加任务结束
pool.join() #任务结束前,阻止回到主进程
for c in container:
print(c)
print('over!!')
阻塞
'''
阻塞式:顺序执行,添加一个任务,执行一个任务,上一个任务不执行完成,下一个任务不会开始
进程池:
pool = Pool(max) 创建进程池对象
pool.apply() 阻塞式
pool.apply_async() 非阻塞式
pool.close() 添加任务结束
pool.join() 让主进程让步,插队
'''
import os
import time
from multiprocessing import Pool
from random import random
#非阻塞
def task(task_name):
print('开始做任务:',task_name)
start = time.time()
#使用sleep
time.sleep(random()*2) #2s以内
end = time.time()
print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid())
# return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()
# container = []
# def callback_func(n):
# container.append(n)
if __name__ == '__main__':
pool = Pool(5)
tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步']
for k in tasks:
pool.apply(task,args=(k,)) #阻塞式
pool.close() # 添加任务结束
pool.join() #任务结束前,阻止回到主进程
print('over!!!!')
队列
from multiprocessing import Queue
q = Queue(5) #设置队列长度为5
q.put('A')
q.put('B')
q.put('C')
q.put('D',timeout=2) #加timeout,超时不等
q.put('E') #put()如果队列满了,就只能等待,除非有空地,则添加成功
print(q.qsize())
if not q.full(): #q.full()判断队列是否已满,q.empty()判断队列是否是空的
q.put('F')
else:
print('Queue is full')
#获取队列的值
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
# print(q.get(timeout=2))
#不等待,这样就不阻塞
q.put_nowait('G')
print(q.get_nowait())
12.5 进程间的通讯
'''
进程间通讯:
通过使用公共对象q = Queue(5),保证任务在同一个队列里面
'''
from multiprocessing import Process,Queue
from time import sleep
#将队列q作为对象传递
def download(q):
images = ['girl.jpg','boy.jpg','lady.jpg']
for image in images:
print("Downloading:",image)
sleep(0.5)
q.put(image)
def getfile(q):
while True:
try:
file = q.get(timeout=2) #超时2秒无内容则不去取
print(f'{file} save success!!')
except:
print('All file downloaded successfully!!')
break
if __name__ == '__main__':
q = Queue(5)
#q最为参数
p1 = Process(target=download,args=(q,))
p2 = Process(target=getfile,args=(q,))
p1.start()
p1.join()
p2.start()
p2.join()
print('验证任务完成,回到主线程!')
12.6 多线程
线程状态:
'''
多线程:
进程 :是一个正在执行中的程序,每一个进程执行都有一个执行顺序,该顺序是一个执行路径,或者叫一个控制单元;
线程:就是进程中的一个独立控制单元,线程在控制着进程的执行。一个进程中至少有一个进程。
多线程:一个进程中不只有一个线程。
如何创建和使用线程:
调用threading函数来创建线程对象
线程的状态:新建 --> 就绪 --> 运行 --> 结束
|
阻塞
'''
import threading
from time import sleep
#进程 Process
#线程 Thread
def download(n):
images = ['girl.jpg','boy.jpg','lady.jpg']
for image in images:
print("Downloading:",image)
sleep(n)
print(f'{image}保存成功!!')
def listenMusic(n):
musics = ['My heart will go on','My love','吻别','梦回唐朝']
for music in musics:
sleep(n)
print(f'正在听{music}!!')
if __name__ == '__main__':
#创建线程对象
t = threading.Thread(target=download,name='aa',args=(1,))
t.start()
t1 = threading.Thread(target=listenMusic,name='bb',args=(1,))
t1.start()
# n = 1
# while True:
# print(n)
# n+=1
# sleep(1.5)
12.6.1 多线程共享全局变量
'''
线程可以共享全局变量
GIL 全局解释器锁
python底层只要用线程就默认加锁 GIL
线程异步 --> 共享数据会导致数据不安全
线程同步 --> 导致的是效率低,但数据安全
运算数据量大时,会自动释放锁GIL
线程:耗时操作时用,爬虫,IO
进程:计算密集型
'''
import threading
from time import sleep
#进程 Process
#线程 Thread
money = 1000
def run1():
global money
for i in range(100):
sleep(0.1)
money -= 1
# def run2():
# global money
# for i in range(100):
# money -= 1
if __name__ == '__main__':
#创建线程对象 4个线程
t = threading.Thread(target=run1,name='th1')
t1 = threading.Thread(target=run1,name='th2')
t2 = threading.Thread(target=run1,name='th3')
t3 = threading.Thread(target=run1,name='th4')
t.start()
t1.start()
t2.start()
t3.start()
#阻塞
t.join()
t1.join()
t2.join()
t3.join()
print('money:',money) #600, 共享同一个全局变量
当计算大数据量时,自动释放锁
import threading
from time import sleep
#进程 Process
#线程 Thread
n = 0
def task1():
global n
for i in range(1000000):
n += 1
print('------->task1:',n)
print('------------')
sleep(1)
def task2():
global n
for i in range(1000000):
n += 1
print('------->task2:',n)
print('------------')
sleep(1)
if __name__ == '__main__':
#创建线程对象 4个线程
t = threading.Thread(target=task1,name='th1')
t1 = threading.Thread(target=task2,name='th3')
t.start()
t1.start()
#阻塞
t.join()
t1.join()
print('over')
#运行结果
------->task1: 1000000
------------
------->task2: 1647975 # 释放了锁
------------
over
12.7 多线程同步
'''
共享数据存在不安全性:
如果多个线程同时对数据进行修改,则可能出现不可预料的结果,为了保证数据的正确性,需要的对多个线程进行同步
同步:一个接一个的来做任务,一个做完,另一个才能进来,效率降低
使用Thread对象的lock和Rlock可以实现简单的线程同步,这2个对象都有acquire和release 方法,对应那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release之间。
多线程的优势在于可以同时运行多个任务,但是当线程需要共享数据时,可能存在数据不同步的问题,为了避免这种情况,引入了锁的概念。
lock = threading.Lock()
lock.acquire() #请求得到锁
........
lock.release() #释放锁
只要不释放锁,其他线程都无法进入运行状态
'''
import threading
from time import sleep
import random
#加锁
lock = threading.Lock()
list1 = [0] *10 #列表里有10个0
def task1():
#获取线程锁,如果已经上锁,则等待锁的释放
lock.acquire() #阻塞
for i in range(len(list1)):
list1[i] = 1
sleep(0.5)
lock.release() #释放
def task2():
#获取线程锁,如果已经上锁,则等待锁的释放
lock.acquire() #阻塞
for i in range(len(list1)):
print('--->',list1[i])
sleep(0.5)
lock.release() #释放
if __name__ == '__main__':
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t2.start()
t1.start()
t2.join()
t1.join()
print(list1)
12.8 死锁
'''
死锁:
开发过程中使用线程,在线程共享多个资源时,如果2个线程分别占有一部分资源,并且同时等待对方的资源,就会造成死锁
尽管死锁很少发生,但一旦发生就会造成应用的停止响应,程序不做任何事情
避免出现死锁的方法:资源分配不当导致死锁
1,重构代码
2,acquire() 加timeout,利用时间差来释放锁
'''
from threading import Thread,Lock
from time import sleep
lockA = Lock()
lockB = Lock()
class MyThread1(Thread):
def run(self):#start
if lockA.acquire():#如果可以获取到锁,则返回True
print(self.name + '获取到了A锁')
sleep(0.2)
if lockB.acquire(timeout=4): #阻塞
print(self.name + '又试图获取到了B锁,原来还有A锁的存在') #因为有sleep,A锁已经被线程1占有
lockB.release()
lockA.release()
class MyThread2(Thread):
def run(self):#start
if lockB.acquire():#如果可以获取到锁,则返回True
print(self.name + '获取到了B锁')
sleep(0.2)
if lockA.acquire(timeout=2):#阻塞
print(self.name + '又试图获取到了A锁,原来还有B锁的存在') #因为有sleep,A锁已经被线程1占有
lockA.release()
lockB.release()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
#运行结果
Thread-1获取到了A锁
Thread-2获取到了B锁
Thread-1又试图获取到了B锁,原来还有A锁的存在
12.9 生产者与消费者
'''
生产者与消费者:两个线程之间的通讯
python的queue模块中提供了同步的,线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后进先出)队列LifoQueue,和
优先级队列PriorityQueue,这些都实现了锁原理(可以理解为原子操作,要么不做,要么做完)。能够在多线程中直接使用。
可以使用队列来实现线程间的同步。
'''
import threading
import queue
import time
import random
def produce(q):
i=0
while i<10:
num = random.randint(1,100)
q.put('生产者生产数据:%d'%num)
print('生产者生产数据:%d'%num)
time.sleep(1)
i+=1
q.put(None)
#完成任务
q.task_done()
def consume(q):
while True:
item = q.get()
arr.append(item)
if item is None:
break
print(f'消费者获取到:{arr}')
time.sleep(4)
#完成任务
q.task_done()
if __name__ == '__main__':
q = queue.Queue(10)
arr = []
#创建生产者
th = threading.Thread(target=produce,args=(q,))
th.start()
#创建消费者
tc = threading.Thread(target=consume,args=(q,))
tc.start()
th.join()
tc.join()
print('over!!')
12.10 协程
使用生成器来完成协程任务
'''
协程:微线程
进程 --》线程 --》 协程
耗时操作--> 用协程
网络请求
网络下载(下载)
IO 操作:文件读写
阻塞动作
生成器:yield
'''
import time
def task1():
for i in range(3):
print('A'+ str(i))
yield
time.sleep(0.1)
def task2():
for i in range(3):
print('B'+ str(i))
yield
time.sleep(0.1)
if __name__ == '__main__':
g1 = task1()
g2 = task1()
while True:
try:
next(g1)
next(g2)
except:
break
使用greenlet来完成协程任务
'''
greenlet : 完成协程任务
'''
import time
from greenlet import greenlet
def task1():
for i in range(3):
print('A'+ str(i))
g2.switch()
time.sleep(0.1)
def task2():
for i in range(3):
print('B'+ str(i))
g3.switch()
time.sleep(0.1)
def task3():
for i in range(3):
print('C'+ str(i))
g1.switch()
time.sleep(0.1)
if __name__ == '__main__':
g1 = greenlet(task1)
g2 = greenlet(task2)
g3 = greenlet(task3)
g1.switch()
使用gevent来完成协程任务
'''
gevent:
greenlet已经实现了协程任务,但是需要人工切换,不是很智能
gevent 完美解决greenlet的不足
原理:
当一个greenlet遇到IO(input output,比如网络,文件操作等)操作时,就自动切换到其他greenlet,等到IO完成,再切回来继续执行
由于IO 操作非常耗时,经常使程序处于等待状态,有了gevent就可以自动完成切换协程,保证了总有greenlet在运行,而不是等待IO
'''
import time
from greenlet import greenlet
import gevent
from gevent import monkey
#猴子补丁,替换了sleep的动作,不然就是单个函数顺序执行
monkey.patch_all()
def task1():
for i in range(3):
print('A'+ str(i))
time.sleep(0.1)
def task2():
for i in range(3):
print('B'+ str(i))
time.sleep(0.1)
def task3():
for i in range(3):
print('C'+ str(i))
time.sleep(0.1)
if __name__ == '__main__':
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
g3 = gevent.spawn(task3)
g1.join()
g2.join()
g2.join()
print('---------')
案例
import time,requests
import gevent
from gevent import monkey
import urllib.request
#猴子补丁,检测到耗时操作则切换
monkey.patch_all()
def download(url):
# response = requests.get(url)
# content = response.text
response = urllib.request.urlopen(url)
content = response.read()
print(f'下载了{url}的数据,长度{len(content)}')
if __name__ == '__main__':
urls = ['http://www.baidu.com','http://mail.126.com','http://cn.bing.com/?mkt=zh-CN',]
g1 = gevent.spawn(download,urls[0])
g2 = gevent.spawn(download,urls[1])
g3 = gevent.spawn(download,urls[2])
g1.join()
g2.join()
g3.join()
print('---------')
原创文章,作者:506227337,如若转载,请注明出处:https://blog.ytso.com/278461.html