Python编程学习-基础笔记10


十二、多任务

12.1 进程

'''
并发和并行:
    并发:当有多个线程在操作时,系统只有一个CPU,并不能真正实现多任务同时进行,它是把CPU划分成若干个时间段,任务顺序执行
    并行:当系统有2个及以上CPU,则线程操作有可能并发,当一个CPU 执行一个线程,另一个CPU执行另一个线程,互不抢占资源

实现多任务的方式:
    多进程模式 : 进程是程序的实体,对操作系统来说,一个任务就是一个进程。如打开2个记事本就启动2个进程
        优点:稳定性高,一个进程崩溃了,不会影响其他进程
        缺点:创建进程开销大,操作系统同时运行进程的数量有限
        进程创建: linux下使用os模块fork函数来创建,windows下使用 multiprocessing模块中的process类来创建
        from multiprocessing import Process
        process = Process(target=函数, name= 进程的名字,args=(给函数传递的参数))
        process 对象
        对象调用方法:
            process.start() 启动进程并执行任务
            process.run() 只启动了进程,没有执行任务
            terminate() 终止

    多线程模式
    协程
    进程 --> 线程 --> 协程
'''

#进程创建
import os
from multiprocessing import Process
from time import  sleep
def task1():
    while True:
        sleep(1)
        print('This is task1', os.getpid(),'----->',os.getppid())

def task2():
    while True:
        sleep(1)
        print('This is task2', os.getpid(),'----->',os.getppid())

if __name__ == '__main__':
    print(os.getpid())
    #子进程1
    p = Process(target=task1,name='任务1')
    p.start()
    print(p.name)
    #子进程2
    p1 = Process(target=task2,name='任务2')
    p1.start()
    print(p1.name)

带参数

#进程创建
import os
from multiprocessing import Process
from time import  sleep
def task1(s,name):
    while True:
        sleep(s)
        print('This is task1', os.getpid(),'----->',os.getppid(),name)

def task2(s,name):
    while True:
        sleep(s)
        print('This is task2', os.getpid(),'----->',os.getppid(),name)
number = 1
if __name__ == '__main__':
    print(os.getpid())
    p = Process(target=task1,name='任务1',args=(1,'测试任务1'))
    p.start()
    print(p.name)
    p1 = Process(target=task2,name='任务2',args=(1,'测试任务2'))
    p1.start()
    print(p1.name)
    while True:
        number += 1
        sleep(0.2)
        if number == 100:
            p.terminate()
            p1.terminate()
            break
        else:
            print(number)

    print('************')

12.2 多进程对全局变量的访问

'''
多进程对全局变量的访问:在每一个全局变量里都放一个m变量,保证每个进程访问变量互不干扰
每个进程都独立获得一份全局变量,互不影响各自的修改
'''

#进程创建
import os
from multiprocessing import Process
from time import  sleep

m= 1 #不可变类型
list1= [] #可变类型
def task1(s,name):
    global m
    while True:
        m+=1
        list1.append(str(m)+'task1')
        sleep(s)
        print('This is task1---->',list1) #This is task1----> ['2task1', '3task1', '4task1', '5task1', '6task1', '7task1']

def task2(s,name):
    global m
    while True:
        m+=1
        list1.append(str(m)+'task2') #This is task2----> ['2task2', '3task2', '4task2', '5task2', '6task2', '7task2']
        sleep(s)
        print('This is task2---->',list1)
number = 1
if __name__ == '__main__':
    print(os.getpid())
    p = Process(target=task1,name='任务1',args=(1,'测试任务1'))
    p.start()
    p1 = Process(target=task2,name='任务2',args=(1,'测试任务2'))
    p1.start()
    while True:
        m += 1
        list1.append((str(m)+'main'))
        sleep(0.5)
        print('This is main:---->', list1) #This is main:----> ['2main', '3main', '4main', '5main', '6main', '7main', '8main', '9main', '10main', '11main', '12main', '13main', '14main']

12.3 自定义进程

#进程:自定义
import os
from multiprocessing import Process
from time import  sleep

class MyProcess(Process):
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name
    #重新run方法
    def run(self):
        n = 1
        while True:
            print(f'{self.name}---> 自定义进程,n:{n}')
            n += 1

if __name__ == '__main__':
    p = MyProcess('测试任务1')
    p.start() # 1, 先开一个新的进程,2,调用run()方法
    p1 = MyProcess('测试任务2')
    p1.start()

12.4 进程池的非阻塞/阻塞模式

非阻塞

'''
使用multiprocessing模块提供的pool方法:
    初始化pool时可以指定一个最大进程数,当有新的请求提交到pool中时,如果池子还没满,就会创建一个新的进程来执行该请求
    如果pool中进程数已达到指定的最大值,那么该请求就会等待,知道pool中有进程结束,才会创建新的进程来执行

非阻塞式: 全部添加到队列,立刻返回,并没有等待其它进程执行完毕才会结束。但是回调函数,等待任务完成后才去调用
阻塞式:
'''
import os
import time
from multiprocessing import Pool
from random import random
from time import  sleep

#非阻塞
def task(task_name):
    print('开始做任务:',task_name)
    start = time.time()
    #使用sleep
    time.sleep(random()*2) #2s以内
    end = time.time()
    # print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid())
    return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()
container = []
def callback_func(n):
    container.append(n)

if __name__ == '__main__':
    pool = Pool(5)
    tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步']
    for k in tasks:
        pool.apply_async(task,args=(k,),callback=callback_func)
    pool.close() # 添加任务结束
    pool.join() #任务结束前,阻止回到主进程
    for c in container:
        print(c)

    print('over!!')

阻塞

'''
阻塞式:顺序执行,添加一个任务,执行一个任务,上一个任务不执行完成,下一个任务不会开始

进程池:
    pool = Pool(max) 创建进程池对象
    pool.apply()    阻塞式
    pool.apply_async() 非阻塞式
    pool.close() 添加任务结束
    pool.join() 让主进程让步,插队
'''
import os
import time
from multiprocessing import Pool
from random import random

#非阻塞
def task(task_name):
    print('开始做任务:',task_name)
    start = time.time()
    #使用sleep
    time.sleep(random()*2) #2s以内
    end = time.time()
    print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid())
    # return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()
# container = []
# def callback_func(n):
#     container.append(n)

if __name__ == '__main__':
    pool = Pool(5)
    tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步']
    for k in tasks:
        pool.apply(task,args=(k,)) #阻塞式
    pool.close() # 添加任务结束
    pool.join() #任务结束前,阻止回到主进程

    print('over!!!!')

队列

from multiprocessing import Queue
q = Queue(5) #设置队列长度为5

q.put('A')
q.put('B')
q.put('C')
q.put('D',timeout=2) #加timeout,超时不等
q.put('E') #put()如果队列满了,就只能等待,除非有空地,则添加成功
print(q.qsize())
if not q.full(): #q.full()判断队列是否已满,q.empty()判断队列是否是空的
    q.put('F')
else:
    print('Queue is full')
#获取队列的值
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
# print(q.get(timeout=2))
#不等待,这样就不阻塞
q.put_nowait('G')
print(q.get_nowait())

12.5 进程间的通讯

'''
进程间通讯:
    通过使用公共对象q = Queue(5),保证任务在同一个队列里面

'''
from multiprocessing import Process,Queue
from time import sleep

#将队列q作为对象传递
def download(q):
    images = ['girl.jpg','boy.jpg','lady.jpg']
    for image in images:
        print("Downloading:",image)
        sleep(0.5)
        q.put(image)

def getfile(q):
    while True:
        try:
            file = q.get(timeout=2) #超时2秒无内容则不去取
            print(f'{file} save success!!')
        except:
            print('All file downloaded successfully!!')
            break


if __name__ == '__main__':
    q = Queue(5)
    #q最为参数
    p1 = Process(target=download,args=(q,))
    p2 = Process(target=getfile,args=(q,))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    print('验证任务完成,回到主线程!')

12.6 多线程

线程状态:

Python编程学习-基础笔记10

'''
多线程:
    进程 :是一个正在执行中的程序,每一个进程执行都有一个执行顺序,该顺序是一个执行路径,或者叫一个控制单元;
    线程:就是进程中的一个独立控制单元,线程在控制着进程的执行。一个进程中至少有一个进程。
    多线程:一个进程中不只有一个线程。
如何创建和使用线程:
    调用threading函数来创建线程对象
    线程的状态:新建 --> 就绪 --> 运行 --> 结束
                               |
                               阻塞
'''

import threading

from time import sleep
#进程 Process
#线程 Thread

def download(n):
    images = ['girl.jpg','boy.jpg','lady.jpg']
    for image in images:
        print("Downloading:",image)
        sleep(n)
        print(f'{image}保存成功!!')

def listenMusic(n):
    musics = ['My heart will go on','My love','吻别','梦回唐朝']
    for music in musics:
        sleep(n)
        print(f'正在听{music}!!')

if __name__ == '__main__':
    #创建线程对象
    t = threading.Thread(target=download,name='aa',args=(1,))
    t.start()
    t1 = threading.Thread(target=listenMusic,name='bb',args=(1,))
    t1.start()
    # n = 1
    # while True:
    #     print(n)
    #     n+=1
    #     sleep(1.5)

12.6.1 多线程共享全局变量

'''
线程可以共享全局变量
GIL 全局解释器锁
    python底层只要用线程就默认加锁 GIL
    线程异步 --> 共享数据会导致数据不安全
    线程同步 --> 导致的是效率低,但数据安全
    运算数据量大时,会自动释放锁GIL
线程:耗时操作时用,爬虫,IO
进程:计算密集型
'''
import threading

from time import sleep
#进程 Process
#线程 Thread
money = 1000
def run1():
    global money
    for i in range(100):
        sleep(0.1)
        money -= 1
# def run2():
#     global money
#     for i in range(100):
#         money -= 1

if __name__ == '__main__':
    #创建线程对象 4个线程
    t = threading.Thread(target=run1,name='th1')
    t1 = threading.Thread(target=run1,name='th2')
    t2 = threading.Thread(target=run1,name='th3')
    t3 = threading.Thread(target=run1,name='th4')
    t.start()
    t1.start()
    t2.start()
    t3.start()
    #阻塞
    t.join()
    t1.join()
    t2.join()
    t3.join()
    print('money:',money)  #600, 共享同一个全局变量

当计算大数据量时,自动释放锁

import threading

from time import sleep
#进程 Process
#线程 Thread
n = 0
def task1():
    global n
    for i in range(1000000):
        n += 1
    print('------->task1:',n)
    print('------------')
    sleep(1)

def task2():
    global n
    for i in range(1000000):
        n += 1
    print('------->task2:',n)
    print('------------')
    sleep(1)
if __name__ == '__main__':
    #创建线程对象 4个线程
    t = threading.Thread(target=task1,name='th1')
    t1 = threading.Thread(target=task2,name='th3')
    t.start()
    t1.start()

    #阻塞
    t.join()
    t1.join()
    print('over')
#运行结果
------->task1: 1000000
------------
------->task2: 1647975 # 释放了锁
------------
over

12.7 多线程同步

'''
共享数据存在不安全性:
如果多个线程同时对数据进行修改,则可能出现不可预料的结果,为了保证数据的正确性,需要的对多个线程进行同步
同步:一个接一个的来做任务,一个做完,另一个才能进来,效率降低
使用Thread对象的lock和Rlock可以实现简单的线程同步,这2个对象都有acquire和release 方法,对应那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release之间。
多线程的优势在于可以同时运行多个任务,但是当线程需要共享数据时,可能存在数据不同步的问题,为了避免这种情况,引入了锁的概念。
lock = threading.Lock()
    lock.acquire() #请求得到锁
    ........
    lock.release() #释放锁
只要不释放锁,其他线程都无法进入运行状态
'''
import threading
from time import sleep
import random
#加锁
lock = threading.Lock()

list1 = [0] *10  #列表里有10个0

def task1():
    #获取线程锁,如果已经上锁,则等待锁的释放
    lock.acquire() #阻塞
    for i in range(len(list1)):
        list1[i] = 1
        sleep(0.5)
    lock.release() #释放

def task2():
    #获取线程锁,如果已经上锁,则等待锁的释放
    lock.acquire() #阻塞
    for i in range(len(list1)):
        print('--->',list1[i])
        sleep(0.5)
    lock.release() #释放

if __name__ == '__main__':
    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)
    t2.start()
    t1.start()
    t2.join()
    t1.join()
    print(list1)

12.8 死锁

'''
死锁:
开发过程中使用线程,在线程共享多个资源时,如果2个线程分别占有一部分资源,并且同时等待对方的资源,就会造成死锁
尽管死锁很少发生,但一旦发生就会造成应用的停止响应,程序不做任何事情
避免出现死锁的方法:资源分配不当导致死锁
    1,重构代码
    2,acquire() 加timeout,利用时间差来释放锁
'''
from threading import Thread,Lock
from time import sleep

lockA = Lock()
lockB = Lock()

class MyThread1(Thread):
    def run(self):#start
        if lockA.acquire():#如果可以获取到锁,则返回True
            print(self.name + '获取到了A锁')
            sleep(0.2)
            if lockB.acquire(timeout=4): #阻塞
                print(self.name + '又试图获取到了B锁,原来还有A锁的存在') #因为有sleep,A锁已经被线程1占有
                lockB.release()
            lockA.release()

class MyThread2(Thread):
    def run(self):#start
        if lockB.acquire():#如果可以获取到锁,则返回True
            print(self.name + '获取到了B锁')
            sleep(0.2)
            if lockA.acquire(timeout=2):#阻塞
                print(self.name + '又试图获取到了A锁,原来还有B锁的存在') #因为有sleep,A锁已经被线程1占有
                lockA.release()
            lockB.release()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()

    t1.start()
    t2.start()
#运行结果
Thread-1获取到了A锁
Thread-2获取到了B锁
Thread-1又试图获取到了B锁,原来还有A锁的存在

12.9 生产者与消费者

'''
生产者与消费者:两个线程之间的通讯
python的queue模块中提供了同步的,线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后进先出)队列LifoQueue,和
优先级队列PriorityQueue,这些都实现了锁原理(可以理解为原子操作,要么不做,要么做完)。能够在多线程中直接使用。
可以使用队列来实现线程间的同步。
'''

import threading
import queue
import time
import random
def produce(q):
    i=0
    while i<10:
        num = random.randint(1,100)
        q.put('生产者生产数据:%d'%num)
        print('生产者生产数据:%d'%num)
        time.sleep(1)
        i+=1
    q.put(None)
    #完成任务
    q.task_done()

def consume(q):
    while True:
        item = q.get()
        arr.append(item)
        if item is None:
            break
        print(f'消费者获取到:{arr}')
        time.sleep(4)

    #完成任务
    q.task_done()

if __name__ == '__main__':
    q = queue.Queue(10)
    arr = []

    #创建生产者
    th = threading.Thread(target=produce,args=(q,))
    th.start()

    #创建消费者
    tc = threading.Thread(target=consume,args=(q,))
    tc.start()

    th.join()
    tc.join()
    print('over!!')

12.10 协程

使用生成器来完成协程任务

'''
协程:微线程
进程 --》线程 --》 协程
耗时操作--> 用协程
    网络请求
    网络下载(下载)
    IO 操作:文件读写
    阻塞动作
生成器:yield
'''
import time

def task1():
    for i in range(3):
        print('A'+ str(i))
        yield
        time.sleep(0.1)
def task2():
    for i in range(3):
        print('B'+ str(i))
        yield
        time.sleep(0.1)

if __name__ == '__main__':
    g1 = task1()
    g2 = task1()
    while True:
        try:
            next(g1)
            next(g2)
        except:
            break

使用greenlet来完成协程任务

'''
greenlet : 完成协程任务
'''
import time
from greenlet import greenlet

def task1():
    for i in range(3):
        print('A'+ str(i))
        g2.switch()
        time.sleep(0.1)
def task2():
    for i in range(3):
        print('B'+ str(i))
        g3.switch()
        time.sleep(0.1)

def task3():
    for i in range(3):
        print('C'+ str(i))
        g1.switch()
        time.sleep(0.1)
if __name__ == '__main__':
    g1 = greenlet(task1)
    g2 = greenlet(task2)
    g3 = greenlet(task3)

    g1.switch()

使用gevent来完成协程任务

'''
gevent:
    greenlet已经实现了协程任务,但是需要人工切换,不是很智能
    gevent 完美解决greenlet的不足
原理:
    当一个greenlet遇到IO(input output,比如网络,文件操作等)操作时,就自动切换到其他greenlet,等到IO完成,再切回来继续执行
    由于IO 操作非常耗时,经常使程序处于等待状态,有了gevent就可以自动完成切换协程,保证了总有greenlet在运行,而不是等待IO

'''
import time
from greenlet import greenlet
import gevent
from gevent import monkey
#猴子补丁,替换了sleep的动作,不然就是单个函数顺序执行
monkey.patch_all()

def task1():
    for i in range(3):
        print('A'+ str(i))
        time.sleep(0.1)
def task2():
    for i in range(3):
        print('B'+ str(i))
        time.sleep(0.1)

def task3():
    for i in range(3):
        print('C'+ str(i))
        time.sleep(0.1)
if __name__ == '__main__':
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)
    g3 = gevent.spawn(task3)

    g1.join()
    g2.join()
    g2.join()

    print('---------')

案例

import time,requests
import gevent
from gevent import monkey
import urllib.request
#猴子补丁,检测到耗时操作则切换
monkey.patch_all()

def download(url):
    # response = requests.get(url)
    # content = response.text
    response = urllib.request.urlopen(url)
    content = response.read()
    print(f'下载了{url}的数据,长度{len(content)}')


if __name__ == '__main__':
    urls = ['http://www.baidu.com','http://mail.126.com','http://cn.bing.com/?mkt=zh-CN',]
    g1 = gevent.spawn(download,urls[0])
    g2 = gevent.spawn(download,urls[1])
    g3 = gevent.spawn(download,urls[2])

    g1.join()
    g2.join()
    g3.join()

    print('---------')

原创文章,作者:506227337,如若转载,请注明出处:https://blog.ytso.com/tech/python/278461.html

(0)
上一篇 2022年8月2日 18:05
下一篇 2022年8月2日 18:46

相关推荐

发表回复

登录后才能评论