之前在Fortran中有讲到OpenMP的并行:在Fortran中OpenMP循环并行计算的常用语句。
此外还有:使用sh脚本文件实现半手动的并行计算。
这里讲如何在Python中如何实现多个任务的并行。在Python中,multiprocessing模块提供了一个Process类来代表一个进程对象。而在Fortran的OpenMP中是通过启动“线程”来实现并行。这是实现多任务并行常见的两种方法。直观的理解就是打开电脑的任务管理器(底部栏右击),在应用程序旁边就是“进程”。运行本文的例子程序时,可以发现Python会产生多个“进程”,实现多个任务的并行,截图如下:

而在运行Fortran的OpenMP例子时,单个进程占了近100%CPU(四个核,一个核为25%),也就是OpenMP通过产生多个“线程”,分给指定CPU运行,实现多任务的并行,截图如下:

以下是整理的一个Python多任务并行(multiprocessing模块Process类)的测试例子:
from multiprocessing import Process
import os
import time
def run_proc(name): # 要执行的代码
start_time = time.perf_counter()
time.sleep(2)
end_time = time.perf_counter()
print ('Process id running on %s = %s' % (name, os.getpid()), '; running time = %s' % (end_time-start_time))
if __name__ == '__main__':
# 串行
print('串行程序')
print('Process id = %s.' % os.getpid())
start_time = time.perf_counter()
run_proc('job1')
run_proc('job2')
run_proc('job3')
run_proc('job4')
end_time = time.perf_counter()
print('CPU执行时间(s)=', (end_time-start_time), '\n')
# 并行
print('并行程序')
print('Process id = %s.' % os.getpid())
start_time = time.perf_counter()
p1 = Process(target=run_proc, args=('job1',))
p2 = Process(target=run_proc, args=('job2',))
p3 = Process(target=run_proc, args=('job3',))
p4 = Process(target=run_proc, args=('job4',))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join() # join()方法可以等待子进程结束后再继续往下运行
p2.join()
p3.join()
p4.join()
end_time = time.perf_counter()
print('运行时间(s)=', (end_time-start_time))
计算结果为:

Python运行速度会比Fortran慢,但熟悉后写起来挺方便的。在科学计算中,如果想用Python写代码,又想赶进度,除了手动操作并行外,还可以用上以上这种并行方法。尤其是当使用超算时,如果没有使用多任务并行,选择多个核的速度和选择一个核的运行的速度相差不多的,这时候如果仍然选择多个核(如12个核)进行运算,就很有可能会极大地浪费超算资源和科研经费。
如果是想返回函数值,可通过共享内存Value或Array来实现,参考资料为[2]。这里写出一个例子:
from multiprocessing import Process, Value
def run_proc(name, a, num): # 要执行的代码
num.value = a
if __name__ == '__main__':
num1 = Value('d', 0.0) # 共享内存
num2 = Value('d', 0.0) # 共享内存
p1 = Process(target=run_proc, args=('job1', 100, num1))
p2 = Process(target=run_proc, args=('job2', 200, num2))
p1.start()
p2.start()
p1.join()
p2.join()
print(num1.value)
print(num2.value)
计算结果为:

一个应用的例子如下(使用开源软件包:https://py.guanjihuan.com):
from multiprocessing import Process
import os
import time
import numpy as np
import guan
def main(parameter_array, task_index):
print ('Process id = %s' % (os.getpid()))
result_array = []
for parameter in parameter_array:
result = parameter*2
result_array.append(result)
time.sleep(np.random.uniform(1,10))
guan.write_one_dimensional_data(parameter_array, result_array, filename='task_index='+str(task_index))
if __name__ == '__main__':
cpus = 4
parameter_array_all = np.arange(0, 17, 1)
start_time = time.perf_counter()
process_array = []
for task_index in range(cpus):
parameter_array = guan.preprocess_for_parallel_calculations(parameter_array_all, cpus, task_index)
process_array.append(Process(target=main, args=(parameter_array, task_index)))
for process in process_array: # 运行子进程
process.start()
for process in process_array: # 等待子进程完成
process.join()
end_time = time.perf_counter()
print('运行时间=', (end_time-start_time), '\n')
f = open('result.txt', 'w')
for task_index in range(cpus):
with open('task_index='+str(task_index)+'.txt', 'r') as f0:
text = f0.read()
f.write(text)
f.close()
参考资料:
[1] https://docs.python.org/3/library/multiprocessing.html
[2] 多进程
[3] multiprocessing --- 基于进程的并行
[4] 使用OMP_NUM_THREADS=1进行Python多处理
【说明:本站主要是个人的一些笔记和代码分享,内容可能会不定期修改。为了使全网显示的始终是最新版本,这里的文章未经同意请勿转载。引用请注明出处:https://www.guanjihuan.com】
你好,你这个函数是直接打印结果的。如果我的函数是要返回一个结果,那么并行要怎么获取函数值
可通过共享内存Value来实现。我在博文最后更新了内容,可以参考下。参考资料为[2],在网页中搜索“共享内存”,有对应的内容。