多进程


multiprocessing通过使用子进程而非线程有效的绕过了全局解释器锁。multiprocessing可以利用cpu的多核性能。multiprocessing的Api与threading类似

Process类

开启子进程的方法

  1. spawn
    • 启动一个全新的python解释器进程,子进程不继承父进程的文件描述符或其它资源,只继承和run相关的资源。windows默认
  2. fork
    • 父进程使用os.fork()来开启一个子进程。子进程继承父进程的所有资源。Unix默认。
  3. forkserver
    • 使用forkserver时,会启动一个服务器进程来调用os.fork()来创建子进程。
  4. 通过上下文对象创建子进程
    使用multiprocessing.get_context()方法来获取上下文对象,上下文对象有和multiprocessing相似的Api。
    对象在不同上下文创建的进程可能不兼容,fork上下文创建的锁不能传递给spawn或forkserver启动方法启动的进程。
    multiprocessing.get_context(method=None):

    • 返回一个Context对象,具有和multiprocessing 模块相同的API。
    • 如果method为None,则返回默认上下文对象
    • method为fork、spawn或forkserver
import multiprocessing 
import time 
def fun(i):
    print(f"process{i} start at {time.strftime('%X')}")


if __name__ == "__main__":
    ctx = multiprocessing.get_context()  
    p1 = ctx.Process(target=fun, args=(1,))
    p2 = ctx.Process(target=fun, args=(2,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Process

*class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
与threading.Thread的api类似。run()、start()、join()、name、is_alive()

  1. daemon
    • 守护进程的标志,当进程退出时,会尝试终止所有守护进程子进程。不允许在守护进程中创建子进程
  2. pid: 进程ID,start之前为None
  3. exitcode: 子进程退出代码。
    • 如果该进程尚未终止为None如果子进程run方法正常返回,退出代码将是0.如果它通过sys.exit()终止将返回一个N。
    • 如果时因为run内未捕获异常终止返回1
    • 如果由信号N终止,返回-N
  4. authkey: 进程的身份验证密钥(字节字符串)
    • multiprocessing初始化时,主进程使用os.urandom()分配一个随机字符串。
    • 创建Process对象时,它将继承父进程的身份验证密钥
  5. sentinel
    • 系统对象的数字句柄,当进程结束时变为ready
    • 如果要使用 multiprocessing.connection.wait() 一次等待多个事件,可以使用此值。否则调用 join() 更简单。
    • 在Windows上,这是一个操作系统句柄,可以与 WaitForSingleObject 和 WaitForMultipleObjects 系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自 select 模块的原语。
  6. terminate():终止进程
    • Unix上这是使用SIGTERM 信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等。
    • 子进程的子进程不会被终止,它们会变成孤儿进程
  7. kill(): 与terminate相同,在Unix上使用SIGKILL信号
  8. close(): 关闭Process对象,释放与之关联的所有资源,如果底层进程仍在运行将会引发ValueError。

进程间交换对象

队列

multiprocessing的Queue类时queue.Queue的克隆,是一个线程安全的队列。put方法添加元素时如果队满会一直阻塞直到有空间放入元素。get方法获取元素时如果队空也会一直阻塞。
multiprocessing.Queue([maxsize])

  1. qsize():返回队列长度,但是由于多线程或多进程的上下文,数字不可靠。Unix平台会引起NotImplementedError
  2. empty():队列是否为空。因为多线程或多进程环境状态不可靠。
  3. put(obj, block=Ture, timeout=None):
    • 添加元素,block为True和timeout为None时会阻塞当前进程。直到有空的缓冲槽。
    • 如果timeout为正数,则会在超时后抛出queue.Full异常。如果block为False时,不会阻塞,会抛出queue.Full异常。
  4. put_nowait(obj): 等同于put(obj, block=False)
  5. get(block=True, timeout=None):
    • 获取元素。如果超时或者block为False会抛出queue.Empty异常。
  6. get_nowait(obj): 相当于get(False)
    get和put方法在队列关闭后会抛出ValueError(3.8)
  7. close():指示当前进程将不会再往队列中放入对象。一旦所有缓冲区的数据被写入管道之后,后台的线程会退出。
  8. join_thread():等待后台线程,再close方法之后调用,阻塞当前进程直到后台线程退出,确保缓冲区数据被写入管道。
  9. cancel_join_thread():防止join_thread方法阻塞当前进程。
    multiprocessing.SimpleQueue
    简化版的Queue
  10. close():关闭队列,释放内部资源。队列在被关闭后就不再被使用。不能再用get,put,empty方法
  11. empty()
  12. get()
  13. put(item)
    multiprocessing.JoinableQueue([maxsize])
    Queue子类额外添加了task_done和join方法
  14. task_done():
    • 支出之前进入队列的任务已经完成,由队列的消费者进程使用。每次调用get获取的任务,执行完成后调用task_done告诉队列该任务已经处理完成。
    • 如果join方法正在阻塞,则在所有对象都被处理完后返回。
  15. join(): 阻塞队列直到所有元素都被接受和处理完毕。
    • 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。
import multiprocessing 
import random 
import time 
import random 

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue 

    def run(self):
        for i in range(10):
            item = random.randint(0,256)
            self.queue.put(item)
            print(f"producer append {item} to queue")
            time.sleep(1)
            print(f"the size of queue is {self.queue.qsize()}")

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue 

    def run(self):
        while True:
            if self.queue.empty():
                print("queue is empty")
                break 
            else:
                time.sleep(2)
                item = self.queue.get()
                print(f"Consumer get {item}")
                time.sleep(1)


if __name__ =="__main__":
    queue = multiprocessing.Queue()
    producer = Producer(queue)
    consumer = Consumer(queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

管道

multiprocessing.Pipe([duplex])
返回一对Connection对象,(con1,con2),分别表示管道两端。duplex默认为True,表示可以双向通信。如果False为单向的con1只能接收消息,con2只能发送。
multiprocessing.connection.Connection
连接对象,允许发送可序列化对象。

  1. send(obj):发生一个可序列化的对象
  2. recv():返回另一端使用send发送的对象,该方法会一直阻塞直到接收到对象,如果对端关闭了连接或没有东西可接收返回EOFerror
  3. fileno():返回由连接对象使用的描述符或者句柄
  4. close():关闭对象
    5.poll([timeout]):返回连接对象中释放有可以读取的数据。如果timeout是None那么将一直等待不会超时。
import multiprocessing 
import time 

def send(left, right):
    left.send(['left', time.strftime("%X")])
    print(left.recv())


def recv(left,right):
    right.send(['right', time.strftime("%X")])
    print(right.recv())


if __name__ == '__main__':
    left,right = multiprocessing.Pipe()
    s_p = multiprocessing.Process(target=send, args=(left,right))
    s_p.start()
    r_p = multiprocessing.Process(target=recv, args=(left,right))
    
    r_p.start()
    s_p.join()
    r_p.join()

共享内存

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回从共享内存上创建的ctypes对象,默认情况下返回的对象实际上是经过了同步器包装过的,可以通过value属性访问对象本身。

  1. typecode_or_type指明了返回的对象类型。可能是ctype类型或array模块中每个类型对应的单字符长度的字符串。
  2. *args会传递给这个类的构造函数
  3. lock默认为True,将会新建一个递归式用于同步此值的访问操作。如果是Lock或RLock对象,那么这个传入的锁将会用于同步这个值的访问操作。如果为False,则这个对象的访问将没有锁保护,这个变量不是进程安全的。
from multiprocessing import Process, Value 

def f(v):
    with v.get_lock():  # += 类操作不具有原子性,使用对象内部关联锁
        v.value+=1

if __name__ == "__main__":
    v = Value('i',0)
    p1 = Process(target=f, args=(v,))
    p2 = Process(target=f, args=(v,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(v.value)

multiprocessing.Array(typecode_or_type, size_or_initializer,*,lock=True)

从共享内存申请并返回一个具有ctypes类型的数组对象,默认情况下返回值实际上是被同步器包装过的数组对象。

  1. size_or_initializer如果是整数,则表示数组长度,并且每个元素都会初始化为0,如果是一个序列,则会使用这个序列初始化数组中的每一元素,并且根据元素个数自动判断数组长度。
from multiprocessing import Process, Array 

def f(arr, i):
    arr[i]=i

if __name__ =="__main__":
    arr = Array('i', 10)
    processes = []
    for i in range(10):
        process = Process(target=f,args=(arr,i))
        processes.append(process)
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(arr[:])

管理器multiprocessing.Manager

管理器维护一个用于管理共享对象的服务,其他进程可以通过代理访问这些共享对象。
multiprocessing.Manager()返回一个已启动的SyncManager管理器对象,可以用于在不同进程中共享数据。
支持 list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

from multiprocessing import Manager , Process

def f(mylist, i):
    mylist.append(i)

if __name__ =="__main__":
    manager = Manager()
    mylist = manager.list()
    processes = []
    for i in range(10):
        p = Process(target=f, args=(mylist, i))
        processes.append(p)
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(mylist)

进程池

multiprocessing.Pool([processes[,initalizer[,initargs[,maxtaskperchild[,[context]]]]])
返回一个进程池对象,它控制可以提交作业的工作进程池,支持超时和回调的异步结果以及并行的map。
– processes进程数,如果为None,则使用os.cup_count()返回的值
– 如果initalizer不为None,则每个工作进程将会在启动时调用initalizer(*initargs)

  1. apply(func[,args[,kwds]])
    • 使用args参数以及kwds命名参数调用func,在返回结果前阻塞
  2. apply_async(func[,args[,kwds[,callback[,error_callback]]]])
    • appyly的变种返回AsyncResult对象
    • callback和error_callback是一个接受单格参数的可调用对象,执行成功调用callback,否则调用error_callback
    • 回调函数应该立即执行完成,否则会阻塞负责处理结果的线程
  3. map(func, iterable[,chunksize])
    • 内置map()函数的并行版本,会保持阻塞到获得结果,该方法会将可迭代对象分割为许多块,提交给进程池,可以将chunksize设置为一个正整数从而近似指定块的大小
  4. map_async(func,iterable[,chunksize[,callback[,error_callback]]])
    • map的变种,返回AsyncResult对象
  5. close(): 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。
  6. terminate():不等待未完成任务,立即停止工作进程,进程池对象被垃圾回收时,会立即调用termainate
  7. join(): 必须在close或terminate后调用

multiprocessing.pool.AsyncResult
Pool.apply_async和pool.map_async()返回的对象所属的类。

  1. get([timeout]):获取执行结果
  2. wait([timeout]): 阻塞直到返回结果
  3. ready(): 返回执行状态,是否已经完成
  4. successful():判断是否已经完成并且未引发异常。如果还未获得结果将引发ValueError
from multiprocessing import Pool 

import time 

def f(x):
    time.sleep(1)
    return x**x 

def mf(x):
    time.sleep(0.5)
    return x*2

def initializer(*args):
    print(args, time.strftime("%X"))

if __name__ =="__main__":
    with Pool(processes=4, initializer=initializer,initargs=("init-",)) as pool:
        print(f"apply - start {time.strftime('%X')}")
        print(pool.apply(f,(10,)))  # 阻塞直到运行完成
        print(f"apply - end{time.strftime('%X')}")
        print(f"apply_async - start {time.strftime('%X')}")
        result = pool.apply_async(f,(10,))  # 异步执行不阻塞当前进程
        print(f"apply_async - end{time.strftime('%X')}")
        print(result.get())

        print(f"map - start {time.strftime('%X')}")
        print(pool.map(mf,[i for i in range(10)]))  # 阻塞直到运行完成
        print(f"map- end{time.strftime('%X')}")
        print(f"map_async - start {time.strftime('%X')}")
        result = pool.map_async(mf,[i for i in range(10)])  # 异步执行不阻塞当前进程
        print(f"mapy_async - end{time.strftime('%X')}")
        print(result.get())

原创文章,作者:wure,如若转载,请注明出处:https://blog.ytso.com/271767.html

(0)
上一篇 2022年7月6日
下一篇 2022年7月6日

相关推荐

发表回复

登录后才能评论