Python(2)多进程

Python多进程[可以多核] 多线程【No】协程

multiprocessing --- 基于进程的并行:https://docs.python.org/zh-cn/3/library/multiprocessing.html

Python进阶:https://ebook-python-study.readthedocs.io/zh_CN/latest/index.html

一、进程的定义

进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。
线程:操作系统能够进行运算调度的最小单位
协程:协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。 因为它自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。

https://anchorety.github.io/2018/12/30/python%E5%A4%9A%E8%BF%9B%E7%A8%8B/

在介绍Python中的线程之前,先明确一个问题,Python中的多线程是假的多线程! 为什么这么说,我们先明确一个概念,全局解释器锁(GIL)。

Python代码的执行由Python虚拟机(解释器)来控制。Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释器可以运行多个线程,只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行。在多线程环境中,Python虚拟机按照以下方式执行。

1.设置GIL。

2.切换到一个线程去执行。

3.运行。

4.把线程设置为睡眠状态。

5.解锁GIL。

6.再次重复以上步骤。

对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行

如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比CPU密集型的Python程序更能充分利用多线程的好处。

任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

1.1 multiprocessing

1.2 joblib

Python之并行--基于joblib - 半个冯博士的文章 - 知乎 https://zhuanlan.zhihu.com/p/180347090

首先joblib里面最常用到的一个类和一个方法分别是ParalleldelayedParallel主要用于初始化并行计算时需要用到的参数,而delayed则主要用来指定需要被并行的参数。比如官方给出的以下示例:

1
2
3
4
from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

这段代码其实已经基本看出方法的主要使用模式了。

解释一下:

  • Parallel(n_jobs=2): 指定两个CPU(默认是分配给不同的CPU)
  • 后面的delayed(sqrt)表示要用的函数是sqrt,这里这种用法就非常类似C++里面的委托(delegate)
  • (i ** 2) for i in range(10): 这里注意(i**2)的括号和delayed(sqrt)是紧挨着的。这一小段表示要传递给delayed中指定的函数的参数是i^2

那么结合这么一小段程序,其实已经能大致理解它的使用方法了。这里最开始可能主要不习惯的是要用到了Python里面的内部函数机制。

二、多进程应用

2.1、python多进程读写公共数据 - 异步读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import multiprocessing, os, time  
import numpy as np

#公共数据
temp = np.zeros((4,12))
# 回调函数,用于多进程读写公共数据的,我的理解:多进程在回调函数这里是串行的,否则同时读写会乱掉
# 回调函数必须只有一个输入参数
def mycallback(index):
i,j = index[0],index[1]
temp[i,j] = i+j
# 多进程处理的耗时的算法函数
def multiprocess(i,j):
print("子进程开始执行>>> pid={},ppid={},编号{}".format(os.getpid(), os.getppid(), i))
time.sleep(2)
return (i,j)
if __name__ == '__main__':
time_start = time.time()
MultiP = multiprocessing.Pool(2) # 多进程

for i in range(3):
for j in range(4):
print("主进程开始执行>>> parent_pid={}".format(os.getpid()))
MultiP.apply_async(multiprocess, args=(i, j), callback=mycallback)

# 关闭进程池,停止接受其它进程
MultiP.close()
# 阻塞进程 等待进程池中的所有进程执行完毕,必须在close()之后调用
MultiP.join()
print("主进程终止")
time_end = time.time()
print('totally cost', time_end - time_start)
print(temp)
异步非阻塞式:

正因为是异步非阻塞式的,不用等待当前运行的子进程执行完毕,随时根据系统调度来进行进程切换。基本上主进程和三个子进程,四个进程是同时运行的。

2.2 python中多进程读取excel文件 - 异步读取,返回值管理

https://blog.csdn.net/hynkoala/article/details/93895004

首先准备:当前文件上级目录下有个excels目录,目录里存在15份.xls文件,每个文件1000条数据,需要通过多进程对这些文件读取为pandas的dataframe格式

进程池最大数量设为10:6.580815315246582

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# @datetime:6/26/0026
"""通过多进程加速读取excel的测试"""
__author__ = "hanyaning@deri.energy"
import os.path
import time
from service import logger
import pandas as pd
from multiprocessing import Pool

logger = logger.MyLogger("multi_process").getLogger()

def getExcelData(path):
logger.info("开始读取excel,当前进程pid:" + str(os.getpid()))
data = pd.DataFrame()
if not os.path.exists(path):
raise FileNotFoundError()
if os.path.isfile(path):
logger.info("读取Excel文件完毕,当前进程pid:" + str(os.getpid()))
return data.append(pd.read_excel(path, skiprows=1, skipfooter=1), sort=False)

if __name__ == "__main__":
excel_path = os.path.join(os.getcwd(), "../excels")
xls_names = [x for x in os.listdir(excel_path) if x.endswith(".xls")]
startTime = time.time()

p_list = []
# 使用进程池Pool
pool = Pool(processes=10)
pool_data_list = []
data = pd.DataFrame()
for file_name in xls_names:
# 需要注意不能直接在这里调用get方法获取数据,原因是apply_async后面 get()等待线程运行结束才会下一个,这里多进程会变成阻塞执行
pool_data_list.append(pool.apply_async(getExcelData, (os.path.join(excel_path, file_name))))
pool.close()
# 需要阻塞以下,等所有子进程执行完毕后主线程才继续执行
pool.join()
for pool_data in pool_data_list:
# 这里再使用get()方法可以获取返回值
data = data.append(pool_data.get())
endTime = time.time()
print(endTime - startTime)
print(len(data))

单进程读取对照测试

耗时:12.019948959350586

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ExcelReader:

def __init__(self, path, file_suffix=".xls"):
self.path = path
self.file_suffix = file_suffix

def getData(self):
if not os.path.exists(self.path):
raise FileNotFoundError()
data = pd.DataFrame()
if os.path.isdir(self.path):
xls_names = [x for x in os.listdir(self.path) if x.endswith(self.file_suffix)]
logger.info("开始")
for xls_name in xls_names:
df = pd.read_excel(os.path.join(self.path, xls_name), skiprows=1, skipfooter=1)
data = data.append(df, sort=False)
logger.info("读取Excel文件完毕,共读取" + str(xls_names.__len__()) + "个文件")
return data


if __name__ == "__main__":
start = time.time()
reader = ExcelReader(os.path.join(os.getcwd(), "../excels"))
data = reader.getData()
end = time.time()
print(end - start)
print(len(data))