multiprocessing通过使用子进程而非线程有效的绕过了全局解释器锁。multiprocessing可以利用cpu的多核性能。multiprocessing的Api与threading类似
Process类
开启子进程的方法
- spawn
- 启动一个全新的python解释器进程,子进程不继承父进程的文件描述符或其它资源,只继承和run相关的资源。windows默认
- fork
- 父进程使用os.fork()来开启一个子进程。子进程继承父进程的所有资源。Unix默认。
- forkserver
- 使用forkserver时,会启动一个服务器进程来调用os.fork()来创建子进程。
- 通过上下文对象创建子进程
使用multiprocessing.get_context()方法来获取上下文对象,上下文对象有和multiprocessing相似的Api。
对象在不同上下文创建的进程可能不兼容,fork上下文创建的锁不能传递给spawn或forkserver启动方法启动的进程。
multiprocessing.get_context(method=None):- 返回一个Context对象,具有和multiprocessing 模块相同的API。
- 如果method为None,则返回默认上下文对象
- method为fork、spawn或forkserver
import multiprocessing
import time
def fun(i):
print(f"process{i} start at {time.strftime('%X')}")
if __name__ == "__main__":
ctx = multiprocessing.get_context()
p1 = ctx.Process(target=fun, args=(1,))
p2 = ctx.Process(target=fun, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
Process
*class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
与threading.Thread的api类似。run()、start()、join()、name、is_alive()
- daemon
- 守护进程的标志,当进程退出时,会尝试终止所有守护进程子进程。不允许在守护进程中创建子进程
- pid: 进程ID,start之前为None
- exitcode: 子进程退出代码。
- 如果该进程尚未终止为None如果子进程run方法正常返回,退出代码将是0.如果它通过sys.exit()终止将返回一个N。
- 如果时因为run内未捕获异常终止返回1
- 如果由信号N终止,返回-N
- authkey: 进程的身份验证密钥(字节字符串)
- multiprocessing初始化时,主进程使用os.urandom()分配一个随机字符串。
- 创建Process对象时,它将继承父进程的身份验证密钥
- sentinel
- 系统对象的数字句柄,当进程结束时变为ready
- 如果要使用 multiprocessing.connection.wait() 一次等待多个事件,可以使用此值。否则调用 join() 更简单。
- 在Windows上,这是一个操作系统句柄,可以与 WaitForSingleObject 和 WaitForMultipleObjects 系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自 select 模块的原语。
- terminate():终止进程
- Unix上这是使用SIGTERM 信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等。
- 子进程的子进程不会被终止,它们会变成孤儿进程
- kill(): 与terminate相同,在Unix上使用SIGKILL信号
- close(): 关闭Process对象,释放与之关联的所有资源,如果底层进程仍在运行将会引发ValueError。
进程间交换对象
队列
multiprocessing的Queue类时queue.Queue的克隆,是一个线程安全的队列。put方法添加元素时如果队满会一直阻塞直到有空间放入元素。get方法获取元素时如果队空也会一直阻塞。
multiprocessing.Queue([maxsize])
- qsize():返回队列长度,但是由于多线程或多进程的上下文,数字不可靠。Unix平台会引起NotImplementedError
- empty():队列是否为空。因为多线程或多进程环境状态不可靠。
- put(obj, block=Ture, timeout=None):
- 添加元素,block为True和timeout为None时会阻塞当前进程。直到有空的缓冲槽。
- 如果timeout为正数,则会在超时后抛出queue.Full异常。如果block为False时,不会阻塞,会抛出queue.Full异常。
- put_nowait(obj): 等同于put(obj, block=False)
- get(block=True, timeout=None):
- 获取元素。如果超时或者block为False会抛出queue.Empty异常。
- get_nowait(obj): 相当于get(False)
get和put方法在队列关闭后会抛出ValueError(3.8) - close():指示当前进程将不会再往队列中放入对象。一旦所有缓冲区的数据被写入管道之后,后台的线程会退出。
- join_thread():等待后台线程,再close方法之后调用,阻塞当前进程直到后台线程退出,确保缓冲区数据被写入管道。
- cancel_join_thread():防止join_thread方法阻塞当前进程。
multiprocessing.SimpleQueue
简化版的Queue - close():关闭队列,释放内部资源。队列在被关闭后就不再被使用。不能再用get,put,empty方法
- empty()
- get()
- put(item)
multiprocessing.JoinableQueue([maxsize])
Queue子类额外添加了task_done和join方法 - task_done():
- 支出之前进入队列的任务已经完成,由队列的消费者进程使用。每次调用get获取的任务,执行完成后调用task_done告诉队列该任务已经处理完成。
- 如果join方法正在阻塞,则在所有对象都被处理完后返回。
- join(): 阻塞队列直到所有元素都被接受和处理完毕。
- 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。
import multiprocessing
import random
import time
import random
class Producer(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
for i in range(10):
item = random.randint(0,256)
self.queue.put(item)
print(f"producer append {item} to queue")
time.sleep(1)
print(f"the size of queue is {self.queue.qsize()}")
class Consumer(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
if self.queue.empty():
print("queue is empty")
break
else:
time.sleep(2)
item = self.queue.get()
print(f"Consumer get {item}")
time.sleep(1)
if __name__ =="__main__":
queue = multiprocessing.Queue()
producer = Producer(queue)
consumer = Consumer(queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
管道
multiprocessing.Pipe([duplex])
返回一对Connection对象,(con1,con2),分别表示管道两端。duplex默认为True,表示可以双向通信。如果False为单向的con1只能接收消息,con2只能发送。
multiprocessing.connection.Connection
连接对象,允许发送可序列化对象。
- send(obj):发生一个可序列化的对象
- recv():返回另一端使用send发送的对象,该方法会一直阻塞直到接收到对象,如果对端关闭了连接或没有东西可接收返回EOFerror
- fileno():返回由连接对象使用的描述符或者句柄
- close():关闭对象
5.poll([timeout]):返回连接对象中释放有可以读取的数据。如果timeout是None那么将一直等待不会超时。
import multiprocessing
import time
def send(left, right):
left.send(['left', time.strftime("%X")])
print(left.recv())
def recv(left,right):
right.send(['right', time.strftime("%X")])
print(right.recv())
if __name__ == '__main__':
left,right = multiprocessing.Pipe()
s_p = multiprocessing.Process(target=send, args=(left,right))
s_p.start()
r_p = multiprocessing.Process(target=recv, args=(left,right))
r_p.start()
s_p.join()
r_p.join()
共享内存
multiprocessing.Value(typecode_or_type, *args, lock=True)
返回从共享内存上创建的ctypes对象,默认情况下返回的对象实际上是经过了同步器包装过的,可以通过value属性访问对象本身。
- typecode_or_type指明了返回的对象类型。可能是ctype类型或array模块中每个类型对应的单字符长度的字符串。
- *args会传递给这个类的构造函数
- lock默认为True,将会新建一个递归式用于同步此值的访问操作。如果是Lock或RLock对象,那么这个传入的锁将会用于同步这个值的访问操作。如果为False,则这个对象的访问将没有锁保护,这个变量不是进程安全的。
from multiprocessing import Process, Value
def f(v):
with v.get_lock(): # += 类操作不具有原子性,使用对象内部关联锁
v.value+=1
if __name__ == "__main__":
v = Value('i',0)
p1 = Process(target=f, args=(v,))
p2 = Process(target=f, args=(v,))
p1.start()
p2.start()
p1.join()
p2.join()
print(v.value)
multiprocessing.Array(typecode_or_type, size_or_initializer,*,lock=True)
从共享内存申请并返回一个具有ctypes类型的数组对象,默认情况下返回值实际上是被同步器包装过的数组对象。
- size_or_initializer如果是整数,则表示数组长度,并且每个元素都会初始化为0,如果是一个序列,则会使用这个序列初始化数组中的每一元素,并且根据元素个数自动判断数组长度。
from multiprocessing import Process, Array
def f(arr, i):
arr[i]=i
if __name__ =="__main__":
arr = Array('i', 10)
processes = []
for i in range(10):
process = Process(target=f,args=(arr,i))
processes.append(process)
for p in processes:
p.start()
for p in processes:
p.join()
print(arr[:])
管理器multiprocessing.Manager
管理器维护一个用于管理共享对象的服务,其他进程可以通过代理访问这些共享对象。
multiprocessing.Manager()返回一个已启动的SyncManager管理器对象,可以用于在不同进程中共享数据。
支持 list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。
from multiprocessing import Manager , Process
def f(mylist, i):
mylist.append(i)
if __name__ =="__main__":
manager = Manager()
mylist = manager.list()
processes = []
for i in range(10):
p = Process(target=f, args=(mylist, i))
processes.append(p)
for p in processes:
p.start()
for p in processes:
p.join()
print(mylist)
进程池
multiprocessing.Pool([processes[,initalizer[,initargs[,maxtaskperchild[,[context]]]]])
返回一个进程池对象,它控制可以提交作业的工作进程池,支持超时和回调的异步结果以及并行的map。
– processes进程数,如果为None,则使用os.cup_count()返回的值
– 如果initalizer不为None,则每个工作进程将会在启动时调用initalizer(*initargs)
- apply(func[,args[,kwds]])
- 使用args参数以及kwds命名参数调用func,在返回结果前阻塞
- apply_async(func[,args[,kwds[,callback[,error_callback]]]])
- appyly的变种返回AsyncResult对象
- callback和error_callback是一个接受单格参数的可调用对象,执行成功调用callback,否则调用error_callback
- 回调函数应该立即执行完成,否则会阻塞负责处理结果的线程
- map(func, iterable[,chunksize])
- 内置map()函数的并行版本,会保持阻塞到获得结果,该方法会将可迭代对象分割为许多块,提交给进程池,可以将chunksize设置为一个正整数从而近似指定块的大小
- map_async(func,iterable[,chunksize[,callback[,error_callback]]])
- map的变种,返回AsyncResult对象
- close(): 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。
- terminate():不等待未完成任务,立即停止工作进程,进程池对象被垃圾回收时,会立即调用termainate
- join(): 必须在close或terminate后调用
multiprocessing.pool.AsyncResult
Pool.apply_async和pool.map_async()返回的对象所属的类。
- get([timeout]):获取执行结果
- wait([timeout]): 阻塞直到返回结果
- ready(): 返回执行状态,是否已经完成
- successful():判断是否已经完成并且未引发异常。如果还未获得结果将引发ValueError
from multiprocessing import Pool
import time
def f(x):
time.sleep(1)
return x**x
def mf(x):
time.sleep(0.5)
return x*2
def initializer(*args):
print(args, time.strftime("%X"))
if __name__ =="__main__":
with Pool(processes=4, initializer=initializer,initargs=("init-",)) as pool:
print(f"apply - start {time.strftime('%X')}")
print(pool.apply(f,(10,))) # 阻塞直到运行完成
print(f"apply - end{time.strftime('%X')}")
print(f"apply_async - start {time.strftime('%X')}")
result = pool.apply_async(f,(10,)) # 异步执行不阻塞当前进程
print(f"apply_async - end{time.strftime('%X')}")
print(result.get())
print(f"map - start {time.strftime('%X')}")
print(pool.map(mf,[i for i in range(10)])) # 阻塞直到运行完成
print(f"map- end{time.strftime('%X')}")
print(f"map_async - start {time.strftime('%X')}")
result = pool.map_async(mf,[i for i in range(10)]) # 异步执行不阻塞当前进程
print(f"mapy_async - end{time.strftime('%X')}")
print(result.get())
原创文章,作者:wure,如若转载,请注明出处:https://blog.ytso.com/271767.html