# -*- coding: utf-8 -*-
"""Base class for MIMIC dataset
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause
import os
import pandas as pd
import numpy as np
from .base import Standard_Template
from .mimic_clean_methods import clean_height, clean_weight, \
clean_temperature, clean_dbp, clean_sbp, clean_crr, clean_lab, \
clean_o2sat, clean_fio2
import sys
import warnings
if not sys.warnoptions:
warnings.simplefilter("ignore")
[docs]class MIMIC_Data(Standard_Template):
"""The data template to store MIMIC data. Customized fields can be added
in each parse_xxx methods.
Parameters
----------
patient_id
time_duration
selection_method
"""
def __init__(self, patient_id, time_duration, selection_method):
super(MIMIC_Data, self).__init__(patient_id=patient_id)
self.time_duration = time_duration
self.selection_method = selection_method
[docs] def parse_patient(self, pd_series, mapping_dict=None):
if mapping_dict is None:
self.data['gender'] = pd_series['gender'].values[0]
self.data['dob'] = pd_series['dob'].values[0]
else:
self.data['gender'] = pd_series[mapping_dict['gender']].values[0]
self.data['dob'] = pd_series[mapping_dict['dob']].values[0]
[docs] def parse_admission(self, pd_df):
# TODO: implement the mapping dict
for ind, row in pd_df.iterrows():
# each admission is stored as a seperate dictionary and
# added to admission_list
admission_event = {}
admission_event['admission_id'] = row['hadm_id']
admission_event['admission_date'] = row['admittime']
admission_event['discharge_date'] = row['dischtime']
admission_event['death_indicator'] = int(
~pd.isna(pd_df['deathtime']).values[0])
# more elements can be added here
self.data['admission_list'].append(admission_event)
[docs] def parse_icu(self, pd_df, mapping_dict=None):
if len(self.data['admission_list']) == 0:
raise ValueError(
"No admission information found. Parse admission info first.")
for i, admission_event in enumerate(self.data['admission_list']):
# print(ind, self.data['patient_id'], admission_event['admission_id'])
temp_df = pd_df.loc[
pd_df['hadm_id'] == admission_event['admission_id']]
# print(temp_df.shape)
for ind, row in temp_df.iterrows():
admission_event['icustay_id'] = row['icustay_id']
self.data['admission_list'][i] = admission_event
[docs] def parse_event(self, pd_df, save_dir='', event_mapping_df='',
var_list=None):
if len(self.data['admission_list']) == 0:
raise ValueError(
"No admission information found. Parse admission info first.")
# make saving directory if needed
if not os.path.isdir(save_dir):
os.mkdir(save_dir)
for i, admission_event in enumerate(self.data['admission_list']):
# get all the events
temp_df = pd_df.loc[
pd_df['hadm_id'] == admission_event['admission_id']]
if not temp_df.empty:
# save csv location
# raw event
admission_event['event_csv'] = str(
self.data['patient_id']) + '_' + str(
admission_event['admission_id']) + '.csv'
# sort it by the date and time
temp_df = temp_df.sort_values(by='charttime')
first_index = temp_df['charttime'].index[0]
# print(temp_df['charttime'][first_index])
temp_df['first_entry'] = pd.to_datetime(
temp_df['charttime'][first_index])
# calculate the time difference in seconds since the first entry
temp_df['secs_since_entry'] = (temp_df['charttime'] - temp_df[
'first_entry']).dt.total_seconds()
# speed up by dropping date format
temp_df['charttime'] = temp_df['charttime'].astype(str)
temp_df['first_entry'] = temp_df['first_entry'].astype(str)
# save the raw event data to local csv
temp_df.to_csv(
os.path.join(save_dir, admission_event['event_csv']),
index=False)
# generate episode data
# handle all events data by aggregation during a time window
episode_df = self.generate_episode(
temp_df, duration=self.time_duration,
event_mapping_df=event_mapping_df,
var_list=var_list)
if episode_df.shape[0] >= 2:
# save episode location only if there are more than 2 sequencnes
# do not save if it is empty or just a single sequence
admission_event['episode_csv'] = str(
self.data['patient_id']) + '_' + str(
admission_event['admission_id']) + '_episode.csv'
# save the episode data to local csv
episode_df.to_csv(
os.path.join(save_dir, admission_event['episode_csv']),
index=False)
# update the dictionary
self.data['admission_list'][i] = admission_event
return temp_df # for debug purpose
[docs] def write_record(self, temp_list, temp_df, var):
# TODO: this may be sped up by aggreagte on itemid, and then use the last one
if not temp_df.empty:
# clean specific column
if var == 'temperature':
temp_df['value'] = clean_temperature(temp_df)
elif var == 'diastolic blood pressure':
temp_df['value'] = clean_dbp(temp_df)
elif var == 'systolic blood pressure':
temp_df['value'] = clean_sbp(temp_df)
elif var == 'capillary refill rate':
temp_df['value'] = clean_crr(temp_df)
elif var == 'weight':
temp_df['value'] = clean_weight(temp_df)
elif var == 'height':
temp_df['value'] = clean_height(temp_df)
elif var == 'fraction inspired oxygen':
temp_df['value'] = clean_fio2(temp_df)
elif var == 'height':
temp_df['glucose'] = clean_lab(temp_df)
elif var == 'ph':
temp_df['value'] = clean_lab(temp_df)
elif var == 'oxygen saturation':
temp_df['value'] = clean_o2sat(temp_df)
if self.selection_method == 'last':
# take the last record within the range
temp_list.append(temp_df.iloc[-1]['value'])
elif self.selection_method == 'mean':
temp_list.append(temp_df['value'].mean())
# if there is no information, just skip
else:
temp_list.append('')
return temp_list
[docs] def generate_episode(self, pd_df, duration, event_mapping_df, var_list):
self.generate_episode_headers(var_list)
# when it is in the memory, process it directly
max_time_diff = pd_df['secs_since_entry'].max()
# only keep the events we are interested in
key_df = event_mapping_df[event_mapping_df['level2'].isin(var_list)]
# rounding up
n_episode = int(np.ceil(max_time_diff / duration))
# print(n_episode)
episode_df = pd.DataFrame(columns=self.episode_headers_)
for j in range(n_episode):
threshold_l = j * duration
threshold_h = (j + 1) * duration
# find all the events within the duration
slice_df = pd_df.loc[(pd_df['secs_since_entry'] >= threshold_l) & (
pd_df['secs_since_entry'] < threshold_h)]
# need some sort on the time so just get the last value within the range
# can join on multiple things, weight for instance
temp_df = key_df.merge(slice_df, left_on='itemid',
right_on='itemid')
# valid information is available, otherwise skip
if not temp_df.empty:
temp_df.sort_values(by='secs_since_entry', inplace=True)
# initialize the record with timestamp
temp_list = [threshold_h]
# iterate over variables at interests
for var in var_list:
var_df = temp_df[temp_df['level2'] == var]
# write records for each variable at interest
temp_list = self.write_record(temp_list, var_df, var)
# append to the major episode dataframe
epi_df = pd.DataFrame(temp_list).transpose()
epi_df.columns = self.episode_headers_
episode_df = pd.concat([episode_df, epi_df], axis=0)
# ###############################################################
# # need to iterate over rows, may not be sufficiently faster.
# # on park
# get the last record for each variable
# episode_df = temp_df.groupby(['level2']).agg({
# 'itemid' : 'last',
# 'value' : 'last',
# 'valueuom' : 'last',
# })
# self.a = episode_df
# print(episode_df)
# ###############################################################
# return the episode df
return episode_df
##############################################################################
[docs]def parallel_parse_tables(patient_id_list, patient_df, admission_df, icu_df,
event_df, event_mapping_df, duration,
selection_method, var_list, save_dir):
"""Parallel methods to process patient information in batches
Parameters
----------
patient_id_list
patient_df
admission_df
icu_df
event_df
var_list
Returns
-------
"""
valid_data_list, valid_id_list = [], []
# for i in tqdm(range(len(patient_id_list))):
for i in range(len(patient_id_list)):
p_id = patient_id_list[i]
# print('Processing Patient', i + 1, p_id)
# initialize the
temp_data = MIMIC_Data(p_id, duration, selection_method)
p_df = patient_df.loc[patient_df['subject_id'] == p_id]
a_df = admission_df.loc[admission_df['subject_id'] == p_id]
i_df = icu_df.loc[icu_df['subject_id'] == p_id]
e_df = event_df.loc[event_df['subject_id'] == p_id]
if not p_df.empty:
if p_df.shape[0] > 1:
raise ValueError("Patient ID cannot be repeated")
temp_data.parse_patient(p_df)
if not a_df.empty:
temp_data.parse_admission(a_df)
if not i_df.empty:
temp_data.parse_icu(i_df)
if not e_df.empty:
temp_data.parse_event(e_df, save_dir=save_dir,
event_mapping_df=event_mapping_df,
var_list=var_list)
valid_data_list.append(temp_data)
valid_id_list.append(p_id)
return valid_data_list, valid_id_list