from math import sqrt from joblib import Parallel, delayed Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i inrange(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
for i inrange(3): for j inrange(4): print("主进程开始执行>>> parent_pid={}".format(os.getpid())) MultiP.apply_async(multiprocess, args=(i, j), callback=mycallback)
# @datetime:6/26/0026 """通过多进程加速读取excel的测试""" __author__ = "hanyaning@deri.energy" import os.path import time from service import logger import pandas as pd from multiprocessing import Pool
if __name__ == "__main__": excel_path = os.path.join(os.getcwd(), "../excels") xls_names = [x for x in os.listdir(excel_path) if x.endswith(".xls")] startTime = time.time()
p_list = [] # 使用进程池Pool pool = Pool(processes=10) pool_data_list = [] data = pd.DataFrame() for file_name in xls_names: # 需要注意不能直接在这里调用get方法获取数据,原因是apply_async后面 get()等待线程运行结束才会下一个,这里多进程会变成阻塞执行 pool_data_list.append(pool.apply_async(getExcelData, (os.path.join(excel_path, file_name)))) pool.close() # 需要阻塞以下,等所有子进程执行完毕后主线程才继续执行 pool.join() for pool_data in pool_data_list: # 这里再使用get()方法可以获取返回值 data = data.append(pool_data.get()) endTime = time.time() print(endTime - startTime) print(len(data))
defgetData(self): ifnot os.path.exists(self.path): raise FileNotFoundError() data = pd.DataFrame() if os.path.isdir(self.path): xls_names = [x for x in os.listdir(self.path) if x.endswith(self.file_suffix)] logger.info("开始") for xls_name in xls_names: df = pd.read_excel(os.path.join(self.path, xls_name), skiprows=1, skipfooter=1) data = data.append(df, sort=False) logger.info("读取Excel文件完毕,共读取" + str(xls_names.__len__()) + "个文件") return data
if __name__ == "__main__": start = time.time() reader = ExcelReader(os.path.join(os.getcwd(), "../excels")) data = reader.getData() end = time.time() print(end - start) print(len(data))