我是一个使用dask的新手,我正在尝试理解如何利用dask多处理,而不是pathos多处理。令我惊讶的是,dask比pathos慢3-4倍。我显然做错了什么,希望能得到任何关于这方面的指导。
下面是我试图设置一个简单的算术操作的代码:
from pathos.multiprocessing import ProcessingPool as Pool
import pandas as pd, numpy as np,time
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()
class test_pathos:
def __init__(self):
self.NumCols = 270
self.NumRows = 250000
self.cols = ['Col'+ str(i) for i in range(self.NumCols)]
self.data = pd.DataFrame(np.random.randint(0,5,size=(self.NumRows,self.NumCols)),columns=self.cols)
def ProcessCol(self,x):
colname = x.name
DQCol = colname + r'_DQ'
self.data.loc[:, DQCol] = self.data[colname] + 1
def AddTodata(self,colname):
DQColumn = colname+r'_DQ'
self.data.loc[:,DQColumn] = self.data[colname]+1
return self.data[DQColumn]
def AddProcess(self):
p = Pool(nodes = nCores)
ChangedCols = p.map(self.AddTodata,self.cols)
ChangedColsDf = pd.concat(ChangedCols,axis=1)
self.data = pd.concat([self.data,ChangedColsDf],axis=1)
def AddProcess_apply(self):
'---self.data.apply(self.ProcessCol)'
dd.from_pandas(self.data, npartitions=nCores).map_partitions(lambda df : df.apply(self.ProcessCol)).compute(scheduler='processes')
'----------------------------------------------------MAIN---------------------------------------------------------------------'
if __name__ == "__main__":
test_obj = test_pathos()
tinit = time.time()
shapebeforetransmutation = test_obj.data.shape
test_obj.AddProcess()
shapeaftertransmutation = test_obj.data.shape
print('Pathos call time is :', time.time() - tinit)
tinit = time.time()
test_obj.AddProcess_apply()
print('Dask call time is : ', time.time() - tinit)发布于 2020-02-16 06:29:19
性能取决于很多因素。它可能是管理费用、数据传输或其他任何东西。不幸的是,如果不重新做实验,就不太容易判断。
要了解更多关于Dask性能的信息,建议阅读Understanding Performance文档。
https://stackoverflow.com/questions/60154413
复制相似问题