我在人口卫生行业工作,并从商业公司那里获得合同,对他们的产品进行研究。这是从省级数据集(包括DAD (医院出院)、PC (医生要求)、NACRS (急诊室访问)、PIN (配药)和REG (省登记)确定目标病人群体的一般代码。同样的病人可以在每个数据库中有多个行。例如,如果病人住院3次,S/他将在爸爸的数据中显示为三行。该守则的内容如下:
1) Identify patients for case defn'n #1一节中,已经执行了一系列步骤来标记(作为标记)每个相关数据,并根据这些标记进行筛选。将数据集链接在一起,以查看特定患者是否满足诊断代码要求。pivot_table函数按独特的病人级别进行汇总,以便由独特的患者进行总结。feature_tagger,以容纳一些经常使用的函数,而不是这些主代码。# Overall steps:
# 1) Patient defintiion: Had a ICD code and a procedure code within a time period
# 2) Output: A list of PHN_ENC of included patients; corresponding index date
# .. 'CaseDefn1_PatientDict_FINAL.txt'
# .. 'CaseDefn1_PatientDf_FINAL.csv'
# 3) Results: Analytic results
# ----------------------------------------------------------------------------------------------------------
import pandas as pd
import datetime
import random
import feature_tagger.feature_tagger as ft
import data_descriptor.data_descriptor as dd
import data_transformer.data_transformer as dt
import var_creator.var_creator as vc
# Unrestrict pandas' output display
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 120)
# Control panel
save_file_switch = False # WARNING: will overwrite existing when == True
df_subsampling_switch = False # WARNING: make to sure turn off for final results
edge_date_inclusion = True # whether to include the last date in the range of inclusion criteria
testing_printout_switch = False
result_printout_switch = True
done_switch = True
df_subsampling_n = 15000
random_seed = 888
# Instantiate objects
ft_obj = ft.Tagger()
dt_obj = dt.Data_Transformer()
# Import data
loc = 'office'
if loc == 'office':
directory = r'E:\My_Working_Primary\Projects\Data_Analysis\\'
elif loc == 'home':
directory = r'C:\Users\MyStuff\Dropbox\Projects\Data_Analysis\\'
else: pass
refDataDir = r'_Data\RefData\\'
realDataDir = r'_Data\RealData\\'
resultDir = r'_Results\\'
file_dad = 'Prepped_DAD_Data.csv'
file_pc = 'Prepped_PC_Data.csv'
file_nacrs = 'Prepped_NACRS_Data.csv'
file_pin = 'Prepped_PIN_Data.csv'
file_reg = 'Prepped_REG_Data.csv'
df_dad = pd.read_csv(directory+realDataDir+file_dad, dtype={'PHN_ENC': str}, encoding='utf-8', low_memory=False)
df_pc = pd.read_csv(directory+realDataDir+file_pc, dtype={'PHN_ENC': str}, encoding='utf-8', low_memory=False)
df_nacrs = pd.read_csv(directory+realDataDir+file_nacrs, dtype={'PHN_ENC': str}, encoding='utf-8', low_memory=False)
df_pin = pd.read_csv(directory+realDataDir+file_pin, dtype={'PHN_ENC': str}, encoding='utf-8', low_memory=False)
df_reg = pd.read_csv(directory+realDataDir+file_reg, dtype={'PHN_ENC': str}, encoding='utf-8', low_memory=False)
# Create random sampling of df's to run codes faster
if df_subsampling_switch==True:
if (df_subsampling_n>len(df_dad))|(df_subsampling_n>len(df_pc))|(df_subsampling_n>len(df_nacrs))|(df_subsampling_n>len(df_pin)):
print ('Warning: Specified subsample size is larger than the total no. of row of some of the dataset,')
print ('As a result, resampling with replacement will be done to reach specified subsample size.')
df_dad = dt_obj.random_n(df_dad, n=df_subsampling_n, on_switch=df_subsampling_switch, random_state=random_seed)
df_pc = dt_obj.random_n(df_pc, n=df_subsampling_n, on_switch=df_subsampling_switch, random_state=random_seed)
df_nacrs = dt_obj.random_n(df_nacrs, n=df_subsampling_n, on_switch=df_subsampling_switch, random_state=random_seed)
df_pin = dt_obj.random_n(df_pin, n=df_subsampling_n, on_switch=df_subsampling_switch, random_state=random_seed)
# Format variable type
df_dad['ADMIT_DATE'] = pd.to_datetime(df_dad['ADMIT_DATE'], format='%Y-%m-%d')
df_dad['DIS_DATE'] = pd.to_datetime(df_dad['DIS_DATE'], format='%Y-%m-%d')
df_pc['SE_END_DATE'] = pd.to_datetime(df_pc['SE_END_DATE'], format='%Y-%m-%d')
df_pc['SE_START_DATE'] = pd.to_datetime(df_pc['SE_START_DATE'], format='%Y-%m-%d')
df_nacrs['ARRIVE_DATE'] = pd.to_datetime(df_nacrs['ARRIVE_DATE'], format='%Y-%m-%d')
df_pin['DSPN_DATE'] = pd.to_datetime(df_pin['DSPN_DATE'], format='%Y-%m-%d')
df_reg['PERS_REAP_END_RSN_DATE'] = pd.to_datetime(df_reg['PERS_REAP_END_RSN_DATE'], format='%Y-%m-%d')
# Import reference codes
file_rxCode = '_InStudyCodes_ATC&DIN.csv'
file_icdCode = '_InStudyCodes_DxICD.csv'
file_serviceCode = '_InStudyCodes_ServiceCode.csv'
df_rxCode = pd.read_csv(directory+refDataDir+file_rxCode, dtype={'ICD_9': str}, encoding='utf-8', low_memory=False)
df_icdCode = pd.read_csv(directory+refDataDir+file_icdCode, encoding='utf-8', low_memory=False)
df_serviceCode = pd.read_csv(directory+refDataDir+file_serviceCode, encoding='utf-8', low_memory=False)
# Defining study's constant variables
inclusion_start_date = datetime.datetime(2017, 4, 1, 00, 00, 00)
inclusion_end_date = datetime.datetime(2018, 3, 31, 23, 59, 59)
sp_serviceCode_dict = {df_serviceCode['Short_Desc'][0]:df_serviceCode['Health_Service_Code'][0]}
sp_serviceCode_val = sp_serviceCode_dict['ABC injection']
sp_dxCode_dict = {'DIABETES_ICD9': df_icdCode['ICD_9'][0], 'DIABETES_ICD10': df_icdCode['ICD_10'][0]}
sp_dxCode_val_icd9 = sp_dxCode_dict['DIABETES_ICD9']
sp_dxCode_val_icd10 = sp_dxCode_dict['DIABETES_ICD10']
# ----------------------------------------------------------------------------------------------------------
# 1) Identify patients for case def'n #1.
# Step 1 - Aged between 18 and 100 years old on the index date
# Step 2 - Had at least 1 recorded ICD diagnostic code based on physician visit (ICD-9-CA=9999 in PC) or
# hospitalization (ICD-10-CA=G9999 in DAD) during the inclusion period
# Step 3.1 - Had at least 1 specific procedure code (99.999O) during
# the inclusion period (Note: earliest ABC injection code date is the Index date)
# Step 3.2 - Construct index date
# Step 4 - Registered as a valid Alberta resident for 2 years before the index date and 1 year after the
# index date (determined from PR)
# 1.1) Get age at each service, then delete rows with age falling out of 18-100 range
df_dad_ageTrimmed = df_dad.copy()
df_dad_ageTrimmed = df_dad_ageTrimmed[(df_dad_ageTrimmed['AGE']>=18) & (df_dad_ageTrimmed['AGE']<=100)]
df_pc_ageTrimmed = df_pc.copy()
df_pc_ageTrimmed = df_pc_ageTrimmed[(df_pc_ageTrimmed['AGE']>=18) & (df_pc_ageTrimmed['AGE']<=100)]
# 1.2) Tag appropriate date within sp range > tag DIABETES code > combine tags
df_dad_ageTrimmed['DAD_DATE_TAG'] = ft_obj.date_range_tagger(df_dad_ageTrimmed, 'ADMIT_DATE',
start_date_range=inclusion_start_date, end_date_range=inclusion_end_date, edge_date_inclusion=
edge_date_inclusion)
df_dad_ageTrimmed['DAD_ICD_TAG'] = ft_obj.multi_var_cond_tagger(df_dad_ageTrimmed, repeat_var_base_name='DXCODE',
repeat_var_start=1, repeat_var_end=25, cond_list=[sp_dxCode_val_icd10])
df_dad_ageTrimmed['DAD_DATE_ICD_TAG'] = ft_obj.summing_all_tagger(df_dad_ageTrimmed, tag_var_list=['DAD_DATE_TAG',
'DAD_ICD_TAG'])
df_pc_ageTrimmed['PC_DATE_TAG'] = ft_obj.date_range_tagger(df_pc_ageTrimmed, 'SE_END_DATE',
start_date_range=inclusion_start_date, end_date_range=inclusion_end_date, edge_date_inclusion=
edge_date_inclusion)
df_pc_ageTrimmed['PC_ICD_TAG'] = ft_obj.multi_var_cond_tagger(df_pc_ageTrimmed, repeat_var_base_name='HLTH_DX_ICD9X_CODE_',
repeat_var_start=1, repeat_var_end=3, cond_list=[str(sp_dxCode_val_icd9)])
df_pc_ageTrimmed['PC_DATE_ICD_TAG'] = ft_obj.summing_all_tagger(df_pc_ageTrimmed, tag_var_list=['PC_DATE_TAG',
'PC_ICD_TAG'])
# Output a list of all patients PHN_ENC who satisfy the Date and DIABETES code criteria
df_dad_ageDateICDtrimmed = df_dad_ageTrimmed[df_dad_ageTrimmed['DAD_DATE_ICD_TAG']==1]
df_pc_ageDateICDtrimmed = df_pc_ageTrimmed[df_pc_ageTrimmed['PC_DATE_ICD_TAG']==1]
dad_patientList_diabetes_Code = df_dad_ageDateICDtrimmed['PHN_ENC'].unique().tolist()
pc_patientList_diabetes_Code = df_pc_ageDateICDtrimmed['PHN_ENC'].unique().tolist()
dad_pc_patientList_diabetes_Code = list(set(dad_patientList_diabetes_Code)|set(pc_patientList_diabetes_Code))
dad_pc_patientList_diabetes_Code.sort()
# 1.3.1) Tag appropriate date within sp range > tag ABC injection code > combine tags
df_pc_ageTrimmed['PC_PROC_TAG'] = df_pc_ageTrimmed['ABC_INJECT']
df_pc_ageTrimmed['PC_DATE_PROC_TAG'] = ft_obj.summing_all_tagger(df_pc_ageTrimmed, tag_var_list=['PC_DATE_TAG',
'PC_PROC_TAG'])
df_pc_ageDateProcTrimmed = df_pc_ageTrimmed[df_pc_ageTrimmed['PC_DATE_PROC_TAG']==1]
pc_patientList_procCode = df_pc_ageDateProcTrimmed['PHN_ENC'].unique().tolist()
dad_pc_patientList_diabetes_NprocCode = list(set(dad_pc_patientList_diabetes_Code)&set(pc_patientList_procCode))
dad_pc_patientList_diabetes_NprocCode.sort()
# 1.3.2) Find Index date
df_pc_ageDateProcTrimmed_pivot = pd.pivot_table(df_pc_ageDateProcTrimmed, index=['PHN_ENC'],
values=['SE_END_DATE', 'AGE', 'SEX', 'RURAL'], aggfunc={'SE_END_DATE':np.min, 'AGE':np.min,
'SEX':'first', 'RURAL':'first'})
df_pc_ageDateProcTrimmed_pivot = pd.DataFrame(df_pc_ageDateProcTrimmed_pivot.to_records())
df_pc_ageDateProcTrimmed_pivot = df_pc_ageDateProcTrimmed_pivot.rename(columns={'SE_END_DATE':'INDEX_DT'})
# 1.4) Filter by valid registry
# Create a list variable (based on index date) to indicate which fiscal years need to be valid according to
# the required 2 years before index and 1 year after index date, in df_pc_ageDateProcTrimmed_pivot
def extract_needed_fiscal_years(row): # extract 2 years before and 1 year after index date
if int(row['INDEX_DT'].month) >= 4:
index_yr = int(row['INDEX_DT'].year)+1
else:
index_yr = int(row['INDEX_DT'].year)
first_yr = index_yr-2
four_yrs_str = str(first_yr)+','+str(first_yr+1)+','+str(first_yr+2)+','+str(first_yr+3)
return four_yrs_str
df_temp = df_pc_ageDateProcTrimmed_pivot.copy()
df_temp['FYE_NEEDED'] = df_temp.apply(extract_needed_fiscal_years, axis=1)
df_temp['FYE_NEEDED'] = df_temp['FYE_NEEDED'].apply(lambda x: x[0:].split(',')) # from whole string to list of string items
df_temp['FYE_NEEDED'] = df_temp['FYE_NEEDED'].apply(lambda x: [int(i) for i in x]) # from list of string items to list of int items
# Create a list variable to indicate the active fiscal year, in df_reg
df_reg['FYE_ACTIVE'] = np.where(df_reg['ACTIVE_COVERAGE']==1, df_reg['FYE'], np.nan)
df_reg_agg = df_reg.groupby(by='PHN_ENC').agg({'FYE_ACTIVE':lambda x: list(x)})
df_reg_agg = df_reg_agg.reset_index()
df_reg_agg['FYE_ACTIVE'] = df_reg_agg['FYE_ACTIVE'].apply(lambda x: [i for i in x if ~np.isnan(i)]) # remove float nan
df_reg_agg['FYE_ACTIVE'] = df_reg_agg['FYE_ACTIVE'].apply(lambda x: [int(i) for i in x]) # convert float to int
# Merge df's and create tag, if active years do not cover all the required fiscal year, exclude patients
# Create inclusion/exclusion patient list to apply to obtain patient cohort based on case def'n #1
df_temp_v2 = df_temp.merge(df_reg_agg, on='PHN_ENC', how='left')
df_temp_v2_trimmed = df_temp_v2[(df_temp_v2['FYE_NEEDED'].notnull())&(df_temp_v2['FYE_ACTIVE'].notnull())]
# Remove rows with missing on either variables
def compare_list_elements_btw_cols(row):
if set(row['FYE_NEEDED']).issubset(row['FYE_ACTIVE']):
return 1
else:
return 0
df_temp_v2_trimmed['VALID_REG'] = df_temp_v2_trimmed.apply(compare_list_elements_btw_cols, axis=1)
df_temp_v2_trimmed_v2 = df_temp_v2_trimmed[df_temp_v2_trimmed['VALID_REG']==1]
reg_patientList = df_temp_v2_trimmed_v2['PHN_ENC'].unique().tolist()
# Apply inclusion/exclusion patient list (from REG) to find final patients
# Obtain final patient list
df_final_defn1 = df_pc_ageDateProcTrimmed_pivot.merge(df_temp_v2_trimmed_v2, on='PHN_ENC', how='inner')
df_final_defn1 = df_final_defn1[['PHN_ENC', 'AGE_x', 'SEX_x', 'RURAL_x', 'INDEX_DT_x']]
df_final_defn1 = df_final_defn1.rename(columns={'AGE_x':'AGE', 'SEX_x':'SEX', 'RURAL_x':'RURAL', 'INDEX_DT_x':'INDEX_DT',})
df_final_defn1['PREINDEX_1Yr'] = (df_final_defn1['INDEX_DT']-pd.Timedelta(days=364)) # 364 because index date is counted as one pre-index date
df_final_defn1['PREINDEX_2Yr'] = (df_final_defn1['INDEX_DT']-pd.Timedelta(days=729)) # 729 because index date is counted as one pre-index date
df_final_defn1['POSTINDEX_1Yr'] = (df_final_defn1['INDEX_DT']+pd.Timedelta(days=364))
list_final_defn1 = df_final_defn1['PHN_ENC'].unique().tolist()
dict_final_defn1 = {'Final unique patients of case definition #1':list_final_defn1}
# Additional ask (later on)
# How: Create INDEX_DT_FIS_YR (index date fiscal year) by mapping INDEX_DT to fiscal year
def index_date_fiscal_year(row):
if ((row['INDEX_DT'] >= datetime.datetime(2015, 4, 1, 00, 00, 00)) &
(row['INDEX_DT'] < datetime.datetime(2016, 4, 1, 00, 00, 00))):
return '2015/2016'
elif ((row['INDEX_DT'] >= datetime.datetime(2016, 4, 1, 00, 00, 00)) &
(row['INDEX_DT'] < datetime.datetime(2017, 4, 1, 00, 00, 00))):
return '2016/2017'
else:
return 'Potential error'
df_final_defn1['INDEX_DT_FIS_YR'] = df_final_defn1.apply(index_date_fiscal_year, axis=1)
# 2) Output final patient list for future access
# WARNING: will overwrite existing
if save_file_switch == True:
if df_subsampling_switch == True:
f = open(directory+resultDir+'_CaseDefn1_PatientDict_Subsample.txt',"w")
f.write(str(dict_final_defn1)+',')
f.close()
df_final_defn1.to_csv(directory+resultDir+'_CaseDefn1_PatientDf_Subsample.csv', sep=',', encoding='utf-8')
elif df_subsampling_switch == False:
f = open(directory+resultDir+'CaseDefn1_PatientDict_FINAL.txt',"w")
f.write(str(dict_final_defn1)+',')
f.close()
df_final_defn1.to_csv(directory+resultDir+'CaseDefn1_PatientDf_FINAL.csv', sep=',', encoding='utf-8')
# 3) Results: Analytic results
if result_printout_switch == True:
print ('Unique PHN_ENC N, (aged 18 to 100 during inclusion period) from DAD:')
print (df_dad_ageTrimmed['PHN_ENC'].nunique())
print ('Unique PHN_ENC N, (aged 18 to 100 during inclusion period) from PC:')
print (df_pc_ageTrimmed['PHN_ENC'].nunique())
print ('Unique PHN_ENC N, (aged 18 to 100 during inclusion period) from DAD or PC:')
dd_obj = dd.Data_Comparator(df_dad_ageTrimmed, df_pc_ageTrimmed, 'PHN_ENC')
print (dd_obj.unique_n_union())
print ('Unique PHN_ENC N, (aged 18 to 100) and (had DIABETES code during inclusion period) from DAD:')
print (df_dad_ageDateICDtrimmed['PHN_ENC'].nunique())
print ('Unique PHN_ENC N, (aged 18 to 100) and (had DIABETES code during inclusion period) from PC:')
print (df_pc_ageDateICDtrimmed['PHN_ENC'].nunique())
print ('Unique PHN_ENC N, (aged 18 to 100) and (had DIABETES code during inclusion period) from DAD or PC:')
print (len(dad_pc_patientList_diabetes_Code))
print ('Unique PHN_ENC N, (aged 18 to 100) and (had DIABETES code during inclusion period)\
and (had ABC injection code) from DAD and PC:')
print (df_pc_ageDateProcTrimmed_pivot['PHN_ENC'].nunique())
print ('Unique PHN_ENC N, (aged 18 to 1005) and (had DIABETES code during inclusion period)\
and (had ABC injection code) and (had AB resident around index date) from DAD, PC, and REG [Case Def #1]:')
print (df_final_defn1['PHN_ENC'].nunique())
# Additional analytic ask (later on)
print ('Patient N by index date as corresponding fiscal year:')
print (df_final_defn1['INDEX_DT_FIS_YR'].value_counts())
if done_switch == True:
ctypes.windll.user32.MessageBoxA(0, b'Hello there', b'Program done.', 3)我的问题是:
# Additional ask (later on)下的信息)。考虑到可维护性和可扩展性,我如何更有效地处理额外的问题?发布于 2019-04-13 00:40:16
如果我正确理解,大多数或所有“项目”都遵循相同的格式,即使它们使用不同的文件并在不同的字段中查找不同的数据。
这就告诉我,你应该试着把重复的部分挤出到不同的帮助器中,并尝试将样板部分放到某种公共框架中。
(注:看起来你在使用Dropbox,我怀疑你是用它把这些文件从办公室搬到家里。)我不知道你是在为你的dropbox付费,还是仅仅使用免费版本,我怀疑dropbox在安全性方面可能比大多数人都要好。但是请注意。
查看pip install --help命令中可用的帮助,特别是-e (--editable)选项。这允许您从目录或URL安装。
这将允许您创建一个模块来检测您的家庭/办公室设置(如果主机名为==‘MY’:.回家..。其他:..。办公室.)并做出任何适当的配置。然后你就可以做这样的事情:
from kubik88 import Config
from kubik88.data_analysis import *(执行import *允许您从其他模块导入函数和类-只需正确设置__all__。)
创建一个类。然后在该类上创建一个模板法,以总结您在最高级别的工作。我是以你的评论文字为基础的,经过几步我就停下来了,因为我希望你明白这个想法:
class PatientDataAnalysis:
def analysis(self):
self.instantiate_objects()
self.import_data()
self.random_sample_dataframes()
self.update_column_formats()
self.import_reference_codes()
self.define_constant_variables()
...注意,我没有做任何工作,只是调用一些方法来执行“原始”操作。
接下来,定义方法来完成基本的事情:
# Waaaay up at the top:
import pathlib
def import_data(self):
''' Import all the project data into dataframes. '''
for abbr, filename in self.data_files.items():
self.import_csv(abbr, filename)
def import_csv(self, abbr, filename, **kwargs):
''' Import one CSV file into a dataframe, and store it in
`self.dataframes` keyed by abbr.
'''
options = (self.read_csv_options if not kwargs
else { **self.read_csv_options, **kwargs })
filespec = self.base_path / self.real_data / filename
df = pd.read_csv(str(filespec), **options)
self.dataframes[abbr] = df使用这种方法,您现在可以对父类进行子类化,并扩展您所关心的方法,将默认行为保留在您不关心的地方(或者它只是工作的地方):
class DiabetesStudy2019(PatientDataAnalysis):
def import_data(self):
# Do the usual stuff
super().import_data()
# And also do one more thing:
...几乎所有你发现自己做了不止一次的事情,你应该写一个函数去做。如果你很幸运(或者说很好),就有一种方法可以把这个函数转换成一种更“数据驱动”的方法:
def reformat_date_field(self, df, fieldname, format='%Y-%m-%d'):
df[fieldname] = pd.to_datetime(df[fieldname], format=format)
reformat_date_field(self.dataframes['dad'], 'ADMIT_DATE')
reformat_date_field(self.dataframes['dad'], 'DIS_DATE')
reformat_date_field(self.dataframes['pc'], 'SE_END_DATE')
reformat_date_field(self.dataframes['pc'], 'SE_START_DATE')
reformat_date_field(self.dataframes['nacrs'], 'ARRIVE_DATE')
reformat_date_field(self.dataframes['pin'], 'DSPN_DATE')
reformat_date_field(self.dataframes['reg'], 'PERS_REAP_END_RSN_DATE')它变成:
date_fields = (('dad', 'ADMIT_DATE'), ('dad', 'DIS_DATE'), ('pc', 'SE_END_DATE'),
('pc', 'SE_START_DATE'), ('nacrs', 'ARRIVE_DATE'), ('pin', 'DISP_DATE'),
('reg', 'PERS_REAP_END_RSN_DATE'))
for df, field in date_fields:
reformat_date_field(self.dataframes[df], field)(也可能是其他一些数据格式,使您的生活更加轻松。)
这样做的目的是:(1)通过调用一个命名函数来明确所发生的事情;(2)通过将字段存储为数据而不是函数调用来轻松地扩展或修改字段列表。
https://codereview.stackexchange.com/questions/217343
复制相似问题