在Python中优雅地用多进程

在Python中优雅地用多进程

摘自知乎:https://zhuanlan.zhihu.com/p/340657122

Python自带的多进程库 multiprocessing 可实现多进程。我想用这些短例子示范如何优雅地用多线程。中文网络上,有些人只是翻译了旧版的 Python官网的多进程文档。而我这篇文章会额外讲一讲下方加粗部分的内容。

  • 创建进程 Process,fork直接继承资源,所以初始化更快,spawn只继承必要的资源,所以更省内存,「程序的入口」 if name == main
  • 进程池 Pool,Pool只能接受一个参数,但有办法传入多个
  • 管道通信 Pipe,最基本的功能,运行速度快
  • 队列通信 Queue,有最常用的功能,运行速度稍慢
  • 共享内存 Manager Value,Python3.9 新特性 真正的共享内存 shared_memory

如下所示,中文网络上一些讲Python多进程的文章,很多重要的东西没讲(毕竟只是翻译了Python官网的多进程旧版文档)。上方的加粗部分他们没讲,但是这是做多进程总需要知道的内容。

目录(请挑选感兴趣的看,没必要全看)

  1. 多线程与多进程的区别
  2. 全局锁与多进程
  3. 子进程 Process
  4. 进程池 Pool
  5. 管道 Pipe
  6. 队列 Queue
  7. 共享内存 Manager
  8. 回答评论区的有用问题(别私信)
  9. 我为何写【在Python中优雅地用多进程】?

更新记录:第一版 2021-1-4,第二版 2021-1-8 被迫更新了一些私信问到的问题

1. 多线程与多进程的区别

多线程 threading: 一个人有与异性聊天和看剧两件事要做。单线程的她可以看完剧再去聊天,但这样子可能就没人陪她聊天了「哼,发消息不回」。我们把她看成一个CPU核心,为她开起多线程——先看一会剧,偶尔看看新消息,在两件事(线程)间来回切换。多线程:单个CPU核心可以同时做几件事,不至于卡在某一步傻等着。

用处:爬取网站信息(爬虫),等待多个用户输入

多进程 processing: 一个人有很多砖需要搬,他领取手套、推车各种物资(向系统申请了资源)然后开始搬砖。然而他身边有很多人,我们让这些人去帮他!(一核有难,八核围观)。于是他们做了分工,砖很快就搬完了。多进程让多个CPU核心可以一起做事,不至于只有一人干活而其他人傻站着。

用处:进行高性能计算。只有多进程方案设计合理,才能加速计算。

一核有难,七核围观

2. 全局锁与多进程

为何在Python里用多进程这么麻烦? 因为Python的线程是操作系统线程,因此要有Python全局解释器锁。一个python解释器进程内有一条主线程,以及多条用户程序的执行线程。即使在多核CPU平台上,由于GIL的存在,所以禁止多线程的并行执行。——来自百度百科词条 全局解释器锁。发展历程:

  1. Python全局锁。Python 3.2的时候更新过GIL。在我小时候,由于Python GIL的存在(全局解释器锁 Global Interpreter Lock) ,此时Python无法靠自己实现多进程
  2. 外部多进程通信。Python3.5。在2015年,要么用Python调用C语言(如Numpy此类用其他语言在底层实现多进程的第三方库),要么需要在外部代码(MPI 2015)
  3. 内置多进程通信。Python 3.6 才让 multiprocessing逐渐发展成一个能用的Python内置多进程库,可以进行进程间的通信,以及有限的内存共享
  4. 共享内存。Python 3.8 在2019年增加了新特性 shared_memory

3. 子进程 Process

多进程的主进程一定要写在程序入口 if __name__ ==’__main__’: 内部

def function1(id):  # 这里是子进程
    print(f'id {id}')

def run__process():  # 这里是主进程
    from multiprocessing import Process
    process = [mp.Process(target=function1, args=(1,)),
               mp.Process(target=function1, args=(2,)), ]
    [p.start() for p in process]  # 开启了两个进程
    [p.join() for p in process]   # 等待两个进程依次结束

# run__process()  # 主线程不建议写在 if外部。由于这里的例子很简单,你强行这么做可能不会报错
if __name__ =='__main__':
    run__process()  # 正确做法:主线程只能写在 if内部

尽管在这个简单的例子里,把主进程run__process()写在程序入口if外部不会有报错。但是你最好还是按我要求去做。详细解释的内容过长,我写在→「Python程序入口有重要功能(多线程)而非编程习惯」

上面的例子只是用Process开启了多进程,不涉及进程通信。当我准备把一个串行任务编排成多进程时,我还需要多进程通信。进程池Pool可以让主程序获得子进程的计算结果(不太灵活,适合简单任务),管道Pipe 队列Queue 等等 可以让进程之间进行通信(足够灵活)。共享值 Value 共享数组 Array 共享内容 shared_memory(Python 3.6 Python3.9 的新特性,还不太成熟)下面开讲。

Python多进程可以选择两种创建进程的方式,spawn 与 fork。分支创建:fork会直接复制一份自己给子进程运行,并把自己所有资源的handle 都让子进程继承,因而创建速度很快,但更占用内存资源。分产创建:spawn只会把必要的资源的handle 交给子进程,因此创建速度稍慢。详细解释请看 Stack OverFlow multiprocessing fork vs spawn 。(分产spawn 是我自己随便翻译的,有更好的翻译请推荐。我绝不把handle 翻译成句柄)

multiprocessing.set_start_method('spawn')  # default on WinOS or MacOS
multiprocessing.set_start_method('fork')   # default on Linux (UnixOS)

请注意:我说 分支fork 在初始化创建多进程的时候比 分产spawn 快,而不是说高性能计算会比较快。通常高性能计算需要让程序运行很久,因此为了节省内存以及进程安全,我建议选择 spawn。

4. 进程池 Pool

几乎Python多进程代码都需要你明明白白地调用Process。而进程池Pool 会自动帮我们管理子进程。Python的Pool 不方便传入多个参数,我这里提供两个解决思路:

思路1:函数 func2 需要传入多个参数,现在把它改成一个参数,无论你直接让args作为一个元组tuple、词典dict、类class都可以

思路2:使用 function.partial Passing multiple parameters to pool.map() function in Python。这个不灵活的方法固定了其他参数,且需要导入Python的内置库,我不推荐

import time

def func2(args):  # multiple parameters (arguments)
    # x, y = args
    x = args[0]  # write in this way, easier to locate errors
    y = args[1]  # write in this way, easier to locate errors

    time.sleep(1)  # pretend it is a time-consuming operation
    return x - y


def run__pool():  # main process
    from multiprocessing import Pool

    cpu_worker_num = 3
    process_args = [(1, 1), (9, 9), (4, 4), (3, 3), ]

    print(f'| inputs:  {process_args}')
    start_time = time.time()
    with Pool(cpu_worker_num) as p:
        outputs = p.map(func2, process_args)
    print(f'| outputs: {outputs}    TimeUsed: {time.time() - start_time:.1f}    \n')

    '''Another way (I don't recommend)
    Using 'functions.partial'. See https://stackoverflow.com/a/25553970/9293137
    from functools import partial
    # from functools import partial
    # pool.map(partial(f, a, b), iterable)
    '''

if __name__ =='__main__':
    run__pool()

5. 管道 Pipe

顾名思义,管道Pipe 有两端,因而 main_conn, child_conn = Pipe() ,管道的两端可以放在主进程或子进程内,我在实验中没发现主管道口main_conn 和子管道口child_conn 的区别。两端可以同时放进去东西,放进去的对象都经过了深拷贝:用 conn.send()在一端放入,用 conn.recv() 另一端取出,管道的两端可以同时给多个进程。conn是 connect的缩写。

import time

def func_pipe1(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(f'{p_id}_send1')
    print(p_id, 'send1')

    time.sleep(0.1)
    conn.send(f'{p_id}_send2')
    print(p_id, 'send2')

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def func_pipe2(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(p_id)
    print(p_id, 'send')
    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def run__pipe():
    from multiprocessing import Process, Pipe

    conn1, conn2 = Pipe()

    process = [Process(target=func_pipe1, args=(conn1, 'I1')),
               Process(target=func_pipe2, args=(conn2, 'I2')),
               Process(target=func_pipe2, args=(conn2, 'I3')), ]

    [p.start() for p in process]
    print('| Main', 'send')
    conn1.send(None)
    print('| Main', conn2.recv())
    [p.join() for p in process]

if __name__ =='__main__':
    run__pipe()

如果追求运行更快,那么最好使用管道Pipe而非下面介绍的队列Queue,详细请移步Python pipes and queues performance ↓

So yes, pipes are faster than queues – but only by 1.5 to 2 times, what did surprise me was that Python 3 is MUCH slower than Python 2 – most other tests I have done have been a bit up and down (as long as it is Python 3.4 – Python 3.2 seems to be a bit of a dog – especially for memory usage).

我小时候曾经用到Python多线程队列功能写过一个实际例子 ↓,若追求极致性能,还可以把里面的Queue改为Pipe。读取多个(海康\大华)网络摄像头的视频流 (使用opencv-python),解决实时读取延迟问题392 赞同 · 281 评论文章

Pipe还有 duplex参数 和 poll() 方法 需要了解。默认情况下 duplex==True,若不开启双向管道,那么传数据的方向只能 conn1 ← conn2 。conn2.poll()==True 意味着可以马上使用 conn2.recv() 拿到传过来的数据。conn2.poll(n) 会让它等待n秒钟再进行查询。

from multiprocessing import Pipe

conn1, conn2 = Pipe(duplex=True)  # 开启双向管道,管道两端都能存取数据。默认开启
# 
conn1.send('A')
print(conn1.poll())  # 会print出 False,因为没有东西等待conn1去接收
print(conn2.poll())  # 会print出 True ,因为conn1 send 了个 'A' 等着conn2 去接收
print(conn2.recv(), conn2.poll(2))  # 会等待2秒钟再开始查询,然后print出 'A False'

尽管我下面的例子不会报错,但这是因为它过于简单,没有真的开多线程去跑,也没有写在程序入口的if内部。很多时候 Pipe运行会快一点,但是它的功能太少了,得用 Queue。最明显的一个区别是:

conn1, conn2 = multiprocessing.Pipe()  # 管道有两端,某一端放入的东西,只能在另一端拿到
queue = multiprocessing.Queue()        # 队列只有一个,放进去的东西可以在任何地方拿到。

6. 队列 Queue

可以 import queue 调用Python内置的队列,在多线程里也有队列 from multiprocessing import Queue。下面提及的都是多线程的队列。

队列Queue 的功能与前面的管道Pipe非常相似:无论主进程或子进程,都能访问到队列,放进去的对象都经过了深拷贝。不同的是:管道Pipe只有两个断开,而队列Queue 有基本的队列属性,更加灵活,详细请移步Stack Overflow Multiprocessing – Pipe vs Queue

def func1(i):
    time.sleep(1)
    print(f'args {i}')

def run__queue():
    from multiprocessing import Process, Queue

    queue = Queue(maxsize=4)  # the following attribute can call in anywhere
    queue.put(True)
    queue.put([0, None, object])  # you can put deepcopy thing
    queue.qsize()  # the length of queue
    print(queue.get())  # First In First Out
    print(queue.get())  # First In First Out
    queue.qsize()  # the length of queue

    process = [Process(target=func1, args=(queue,)),
               Process(target=func1, args=(queue,)), ]
    [p.start() for p in process]
    [p.join() for p in process]

if __name__ =='__main__':
    run__queue()

除了上面提及的 Python多线程,读取多个(海康\大华)网络摄像头的视频流 ,我自己写的开源的强化学习库:小雅 ElegantRL 也使用了 Queue 进行多CPU多GPU训练,为了提速,我已经把Queue 改为 Pipe。

7. 共享内存 Manager

为了在Python里面实现多进程通信,上面提及的 Pipe Queue 把需要通信的信息从内存里深拷贝了一份给其他线程使用(需要分发的线程越多,其占用的内存越多)。而共享内存会由解释器负责维护一块共享内存(而不用深拷贝),这块内存每个进程都能读取到,读写的时候遵守管理(因此不要以为用了共享内存就一定变快)。

Manager可以创建一块共享的内存区域,但是存入其中的数据需要按照特定的格式,Value可以保存数值,Array可以保存数组,如下。这里不推荐认为自己写代码能力弱的人尝试。下面这里例子来自Python官网的Document

# https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing%20array#multiprocessing.Array

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

我删掉了Python 3.8 的shared_momery 介绍,这部分有Bug

下文来自 Stack Overflow,问题 Shared memory in multiprocessing 下thuzhf 的回答 2021-01 :

For those interested in using Python3.8 ‘s shared_memory module, it still has a bug which hasn’t been fixed and is affecting Python3.8/3.9/3.10 by now (2021-01-15). The bug is about resource tracker destroys shared memory segments when other processes should still have valid access. So take care if you use it in your code.

PyTorch 也有自带的多进程 torch.multiprocessing

How to share a list of tensors in PyTorch multiprocessing? rozyang 的回答 ,非常简单,核心代码如下:

import torch.multiprocessing as mp
tensor.share_memory_()

8. 回答评论区的有用问题(不建议私信)

正文已经结束,我把部分multiprocessing的代码都放在github。希望大家能写出让自己满意的多线程。我设计高性能的多进程时,会遵守以下规则:

  • 尽可能少传一点数据
  • 尽可能减少主线程的负担
  • 尽可能不让某个进程傻等着
  • 尽可能减少进程间通信的频率

9. 我为何写【在Python中优雅地用多进程】?

开源的深度强化学习(DRL)算法库 伯克利的Ray-project Rllib训练快,但太复杂,OpenAI的 SpinningUp简单,但不快(没有提及的开源库比不上它们,写于2020年)。刚好我又懂一点多进程、Numpy、深度学习框架、深度强化学习这些双层优化算法,所以我觉得自己也写一个DRL库难度不大,于是开源了强化学习库:小雅 ElegantRL。让别人好好看看,DRL库挺简单的一个东西弄那么复杂做什么?

尽管这个库会一直保持框架小巧、代码优雅来方便入门深度强化学习的人,但 ElegantRL 却把训练效率放在首位(正因如此,ElegantRL 与 SpinningUp的定位不同),所以我需要用Python的多进程来加速 DRL的训练。因而顺便写【在Python中优雅地用多进程】这篇东西。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注