Python专题, 语言

在Python中使用multiprocessing实现多个任务并行

之前在Fortran中有讲到OpenMP的并行:在Fortran中OpenMP循环并行计算的常用语句

此外还有:使用sh脚本文件实现半手动的并行计算

这里讲如何在Python中如何实现多个任务的并行。在Python中,multiprocessing模块提供了一个Process类来代表一个进程对象。而在Fortran的OpenMP中是通过启动“线程”来实现并行。这是实现多任务并行常见的两种方法。直观的理解就是打开电脑的任务管理器(底部栏右击),在应用程序旁边就是“进程”。运行本文的例子程序时,可以发现Python会产生多个“进程”,实现多个任务的并行,截图如下:

而在运行Fortran的OpenMP例子时,单个进程占了近100%CPU(四个核,一个核为25%),也就是OpenMP通过产生多个“线程”,分给指定CPU运行,实现多任务的并行,截图如下:

以下是整理的一个Python多任务并行(multiprocessing模块Process类)的测试例子:

from multiprocessing import Process
import os
import time

def run_proc(name): # 要执行的代码
    start_time = time.perf_counter()
    time.sleep(2)
    end_time = time.perf_counter()
    print ('Process id running on %s = %s' % (name, os.getpid()), '; running time = %s' % (end_time-start_time))


if __name__ == '__main__':

    # 串行
    print('串行程序')
    print('Process id = %s.' % os.getpid())
    start_time = time.perf_counter()
    run_proc('job1')
    run_proc('job2')
    run_proc('job3')
    run_proc('job4')
    end_time = time.perf_counter()
    print('CPU执行时间(s)=', (end_time-start_time), '\n')

    # 并行
    print('并行程序')
    print('Process id = %s.' % os.getpid())
    start_time = time.perf_counter()
    p1 = Process(target=run_proc, args=('job1',))
    p2 = Process(target=run_proc, args=('job2',))
    p3 = Process(target=run_proc, args=('job3',))
    p4 = Process(target=run_proc, args=('job4',))
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p1.join()  # join()方法可以等待子进程结束后再继续往下运行
    p2.join()  
    p3.join()  
    p4.join()
    end_time = time.perf_counter()
    print('运行时间(s)=', (end_time-start_time))

计算结果为:

Python运行速度会比Fortran慢,但熟悉后写起来挺方便的。在科学计算中,如果想用Python写代码,又想赶进度,除了手动操作并行外,还可以用上以上这种并行方法。尤其是当使用超算时,如果没有使用多任务并行,选择多个核的速度和选择一个核的运行的速度相差不多的,这时候如果仍然选择多个核(如12个核)进行运算,就很有可能会极大地浪费超算资源和科研经费。

如果是想返回函数值,可通过共享内存Value或Array来实现,参考资料为[2]。这里写出一个例子:

from multiprocessing import Process, Value

def run_proc(name, a, num): # 要执行的代码
    num.value = a

if __name__ == '__main__':
    num1 = Value('d', 0.0)  # 共享内存
    num2 = Value('d', 0.0)  # 共享内存
    p1 = Process(target=run_proc, args=('job1', 100, num1))
    p2 = Process(target=run_proc, args=('job2', 200, num2))
    p1.start()
    p2.start()
    p1.join()
    p2.join() 
    print(num1.value)
    print(num2.value)

计算结果为:

一个应用的例子如下(使用开源软件包:https://py.guanjihuan.com):

from multiprocessing import Process
import os
import time
import numpy as np
import guan

def main(parameter_array, task_index):
    print ('Process id = %s' % (os.getpid()))
    result_array = []
    for parameter in parameter_array:
        result = parameter*2
        result_array.append(result)
    time.sleep(np.random.uniform(1,10))
    guan.write_one_dimensional_data(parameter_array, result_array, filename='task_index='+str(task_index))

if __name__ == '__main__':
    cpus = 4
    parameter_array_all = np.arange(0, 17, 1) 
    start_time = time.perf_counter()
    process_array = []
    for task_index in range(cpus):
        parameter_array = guan.preprocess_for_parallel_calculations(parameter_array_all, cpus, task_index)
        process_array.append(Process(target=main, args=(parameter_array, task_index)))
    for process in process_array: # 运行子进程
        process.start()
    for process in process_array: # 等待子进程完成
        process.join() 
    end_time = time.perf_counter()
    print('运行时间=', (end_time-start_time), '\n')
    f = open('result.txt', 'w')
    for task_index in range(cpus):
        with open('task_index='+str(task_index)+'.txt', 'r') as f0:
            text = f0.read()
        f.write(text)
    f.close()

参考资料:

[1] https://docs.python.org/3/library/multiprocessing.html

[2] 多进程

[3] multiprocessing --- 基于进程的并行

[4] 使用OMP_NUM_THREADS=1进行Python多处理

3,425 次浏览

【说明:本站主要是个人的一些笔记和代码分享,内容可能会不定期修改。为了使全网显示的始终是最新版本,这里的文章未经同意请勿转载。引用请注明出处:https://www.guanjihuan.com

2 thoughts on “在Python中使用multiprocessing实现多个任务并行”

  1. 你好,你这个函数是直接打印结果的。如果我的函数是要返回一个结果,那么并行要怎么获取函数值

    1. 可通过共享内存Value来实现。我在博文最后更新了内容,可以参考下。参考资料为[2],在网页中搜索“共享内存”,有对应的内容。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注

Captcha Code