我正在迭代存储在我的坞中的csv文件。我想对行进行迭代。本地(w/o对接器)中的相同脚本在6分钟内完成执行,但在坞内读取20行需要一分钟或两分钟(有130万行)。正在读取的csv文件的大小为837 is。
守则如下:
## added a script in the process just for test
import datetime
import sys
import pandas as pd
cleanup_consent_column = "rwJIedeRwS"
omc_master_header = [u'PPAC District Code', u'State Name', u'District Name', u'Distributor Code', u'OMC Name', u'Distributor Contact No', u'Distributor Name', u'Distributor Address', u'SO Name', u'SO Contact', u'SALES AREA CODE', u'Email', u'DNO Name', u'DNO Contact', u'Lat_Mixed', u'Long_Mixed']
#OMC_DISTRIBUTOR_MASTER = "/mnt/data/NFS/TeamData/Multiple/external/mopng/5Feb18_master_ujjwala_latlong_dist_dno_so_v7.csv"
#PPAC_MASTER = "/mnt/data/NFS/TeamData/Multiple/external/mopng/ppac_master_v3_mmi_enriched_with_sanity_check.csv"
def clean(input_filepath, OMC_DISTRIBUTOR_MASTER, PPAC_MASTER, output_filepath):
print("Taylor Swift's clean.")
df = pd.read_csv(input_filepath, encoding='utf-8', dtype=object)
print ('length of input - {0} - num cols - {1}'.format(len(df), len(df.columns.tolist())))
## cleanup consent column
for x in df.columns.tolist():
if x.startswith("rwJIedeRwS"):
del df[x]
break
## strip ppac code from the baseline
df['consumer_id_name_ppac_code'] = df['consumer_id_name_ppac_code'].str.strip()
## merge with entity to get entity_ids
omc_distributor_master = pd.read_csv(OMC_DISTRIBUTOR_MASTER, dtype=object, usecols=omc_master_header)
omc_distributor_master = omc_distributor_master.add_prefix("omc_dist_master_")
df = pd.merge(
df, omc_distributor_master, how='left',
left_on=['consumer_id_name_distributor_code', 'consumer_id_name_omc_name'],
right_on=['omc_dist_master_Distributor Code', 'omc_dist_master_OMC Name']
)
## log if anything not found
print ('responses without distributor enrichment - {0}'.format(len(df[df['omc_dist_master_Distributor Code'].isnull()])))
print ('num distributors without enrichment - {0}'.format(
len(pd.unique(df[df['omc_dist_master_Distributor Code'].isnull()]['consumer_id_name_distributor_code']))
))
## converting date column
df['consumer_id_name_sv_date'] = pd.to_datetime(df['consumer_id_name_sv_date'], format="%d/%m/%Y")
df['consumer_id_name_sv_date'] = df['consumer_id_name_sv_date'].dt.strftime("%Y-%m-%d")
## add eventual_ppac_code
print ("generating eventual ppac code column")
count_de_rows = 0
start_time = datetime.datetime.now()
for i, row in df.iterrows():
count_de_rows += 1
if count_de_rows % 10000 == 0:
print(count_de_rows)
## if not found in master - use baseline data else go with omc master
if row['omc_dist_master_PPAC District Code'] != row['omc_dist_master_PPAC District Code']:
df.ix[i, 'eventual_ppac_code'] = row['consumer_id_name_ppac_code']
else:
df.ix[i, 'eventual_ppac_code'] = row['omc_dist_master_PPAC District Code']
print(datetime.datetime.now() - start_time)
print("I guess it's all alright!")
if __name__ == '__main__':
print("The main function has been called!")
clean(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])发布于 2019-01-17 10:26:09
对接者== ubuntu系统的基本前提是一个逻辑上的谬误。是的,尽可能地优化代码是正确的,但是两个系统中的相同代码显示了不同的统计数据,对接速度很慢。尽管如此,为了减轻内存负担,我开始使用chunksize。使用如此大的数据进行(读和写)的上下文切换是导致坞客慢(特别是写)的原因。应该注意的是,内存不是问题,通过docker在持久存储中写入大数据比在我们的系统中要慢。
发布于 2018-12-21 08:25:25
为什么首先使用循环行?这似乎可以实现矢量化:
df["eventual_ppac_code"] = df["omc_dist_master_PPAC District Code"]
df.loc[df["omc_dist_master_PPAC District Code"] != df["omc_dist_master_PPAC District Code"], "eventual_ppac_code"] = df["consumer_id_name_ppac_code"]话虽如此,您究竟什么时候期望omc_dist_master_PPAC District Code 而不是 equal omc_dist_master_PPAC District Code呢?是同一列吗?
https://stackoverflow.com/questions/53869936
复制相似问题