“并行”指的是将一个大任务分解成多个子任务,同一时间一起解决的过程。Python 依靠它万金油的特性被各行各业拿去处理各种各样的数据,但是它缓慢的运行速度也历来被人诟病。在数据量较大时,巧妙运用并行处理可以省下海量的时间。
并行的问题包含诸多的概念,细细钻研起来有许多值得探索的内容。但从日常使用与提速这一角度看,我们可以直接借鉴包装好了的方法,而不用太过纠结内部机理。在下文,我总结了一些常见、易用的多进程并行处理方法(指多 CPU 并行,而非“多线程”!),分享了一些自己给代码加速的心得,仅做引子与概述。
常见的并行处理方法
目前,常用且较为简洁的 Python 多进程实现方法主要有以下几种:
multiprocessing.Pool()
Pool
即为进程池,可以提供指定数量的进程供用户调用。当有新的请求提交到Pool
时,如果池还没有满,那么就会创建一个新的进程用来执行该请求。最简洁的用法是使用map()
方法映射单个参数给函数:
from multiprocessing import Pool
def f(x:int):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
在上述方法里使用with
关键字是为了避免使用p.close()
方法。
如果需要映射多个参数,则只能对包含多个参数的方法进行封装,这个方法的缺点在于传递给函数的参数必须存储在序列里,例如tuple
或dict
。
from multiprocessing import Pool
def f1(y:tuple):
return y[0] * y[1] # 注意参数
def f2(z:dict):
return z['arg1'] * z['arg2'] # 注意参数
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f1, [(1, 2), (2, 3), (3, 5)]))
with Pool(5) as p:
print(p.map(f2, [('arg': 1, 'arg': 2), ('arg': 3, 'arg': 5)]
上述情况里,进程池里最多能容纳 5 个进程,而实际上只用了 2~3 个。如果请求的进程数大于池所能容纳的最大值,那么该请求就会等待,直到池中有进程结束。apply_async()
方法用于不阻塞的异步执行(无需等待当前进程执行完毕),能根据系统调度自动进行进程切换。
from multiprocessing import Pool
def f(x):
return x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=5)
params= ['param1', 'param2', 'param3', 'param4', 'param5']
for param in params:
pool.apply_async(f, args=(param, ))
pool.close()
此外还有Pool.startmap_async()
、Pool.map_async()
方法,可以自行研究,这里不作赘述。
multiprocessing.Process()
Process
表示一个子进程中的任务,经常可以见到开启多个Process
对象进行并行。一个简单的案例是:
from multiprocessing import Process
def f(name):
print('Hello', name)
if __name__ == '__main__':
process_list = []
for i in range(5):
p = Process(target=f, args=(i,))
p.start()
process_list.append(p)
for i in process_list:
p.join()
其中,p.start()
表示创建子进程p
,p.join()
表示主线程等待p
终止。使用Process
的一个常见实例是结合timeout
机制进行多进程爬虫。
joblib
joblib
里面最常用到Parallel()
类和delayed()
方法。Parallel
用于指定CPU数量,而delayed
则用来指定需要执行的函数与被并行的参数。
官网上的案例如下:
from joblib import Parallel, delayed
from math import sqrt
if __name__ == '__main__':
Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10))
concurrent.futures
concurrent.futures
常用来实现多线程(ThreadPoolExecutor)和多进程(ProcessPoolExecutor)的异步并发,支持map()
方法。这也是一个简洁的方法。
import os
from concurrent.futures import ProcessPoolExecutor
def f(n):
print("I'm number {} with process id {}".format(n, os.getpid()))
with ProcessPoolExecutor(max_workers=10) as executor:
executor.map(f, range(10))
偶然发现的小众包——pqdm
pqdm
结合了进度条包tqdm
与concurrent.futures
,由此可以使用多个 CPU 进行高效的多进程并行。这个包是我在学习tqdm
时偶然发现的,结果发现它意外地简洁易用。这个包既支持传递多个参数给函数并行,也能够显示进度条,非常好用,强烈推荐。
基本的使用方法如下:
from pqdm.processes import pqdm
args = [
{'a': 1, 'b': 2},
{'a': 2, 'b': 3},
{'a': 3, 'b': 4},
{'a': 4, 'b': 5}
]
def multiply(a, b):
return a * b
results = pqdm(args, multiply, n_jobs=3, argument_type='kwargs')
print(results)
如果函数返回的是一个对象,那么可以使用从results
中逐个提取返回的对象并进行后续操作。个人认为,这个包的语法与上面几种相比更加精炼,用起来不容易出错。
我自己是在并行分块导出栅格时用了这个包。友情提醒:在遇到栅格读写这种内存密集型操作时,一定要关注内存的占用量。我在我的 MacBook 上开了 6 个核,每个核按np.array
的形式读取大小为 12000*12000 的栅格,直接把我的 32GB 内存给占爆了,而调成 5 个核就刚刚好。
关于代码提速的小建议
网上相关的内容很多,我在这儿列一些自己的心得感悟。
1、涉及到数据读取或交换时,使用pickle
库。例如,我在自己拯救者电脑上使用geopandas
读取一个几十万条记录的 shp 点文件需要约 40s,而我有数 10 个这样的文件。如果以后每次运行程序时都读取一次这该死的 shp 文件会占很多时间。后来发现,我可以一起读取所有的shp
文件,然后把它们一起转成pickle
格式。同样内容的文件,读取pickle
格式只需要 4s,这极大地节约了我的时间。
2、使用del
关键字,手动删掉一些数据内存占用大的中间变量。这也有助于在 CPU 上进行更密集的计算。
3、涉及到索引时巧妙使用字典dict
,这个是最快的,千万别逐行遍历pandas.DataFrame
。
4、少写全局变量,保持良好的变量命名习惯!在 Jupyter 下,我以前经常会把函数的参数名与外面代码块的变量名写成相同的值。这样写特别特别容易出错,说多了都是泪。这里的代码提速提的是调试的速。
5、题外话:GPU 加速,也就是常见的 CUDA。Python 与 CUDA 结合的产物是numba
库,它既支持 CPU 也支持 GPU。不过目前只能加速 Python 原生函数和部分 numpy 函数。网上有很多“一行代码让你的 python 加速10倍”这样的帖子,用的就是numba
下的@jit
。这个库我并不太熟,以后有时间再钻研。
参考资料
https://cainiaojiaocheng.com/Python/docs/3.10/library/multiprocessing
https://www.zhangshengrong.com/p/8AaYmbVba2/
https://joblib.readthedocs.io/en/latest/parallel.html
https://pqdm.readthedocs.io/en/latest/usage.html#different-ways-to-pass-arguments-to-function