Source code for pyhealth.datasets.mimicextract

import os
from typing import Optional, List, Dict, Tuple, Union

import pandas as pd

from pyhealth.data import Event, Visit, Patient
from pyhealth.datasets import BaseEHRDataset
from pyhealth.datasets.utils import strptime

# TODO: add other tables

[docs]class MIMICExtractDataset(BaseEHRDataset): """Base dataset for MIMIC-Extract dataset. Reads the HDF5 data produced by [MIMIC-Extract](https://github.com/MLforHealth/MIMIC_Extract#step-4-set-cohort-selection-and-extraction-criteria). Works with files created with or without LEVEL2 grouping and with restricted cohort population sizes, other optional parameter values, and should work with many customized versions of the pipeline. You can create or obtain a MIMIC-Extract dataset in several ways: * The default chort dataset is [available on GCP](https://console.cloud.google.com/storage/browser/mimic_extract) (requires PhysioNet access provisioned in GCP). * Follow the [step-by-step instructions](https://github.com/MLforHealth/MIMIC_Extract#step-by-step-instructions) on the MIMIC_Extract github site, which includes setting up a PostgreSQL database and loading the MIMIC-III data files. * Use the instructions at [MIMICExtractEasy](https://github.com/SphtKr/MIMICExtractEasy) which uses DuckDB instead and should be a good bit simpler. Any of these methods will provide you with a set of HDF5 files containing a cleaned subset of the MIMIC-III dataset. This class can be used to read that dataset (mainly the `all_hourly_data.h5` file). Consult the MIMIC-Extract documentation for all the options available for dataset generation (cohort selection, aggregation level, etc.). Args: dataset_name: name of the dataset. root: root directory of the raw data (should contain one or more HDF5 files). tables: list of tables to be loaded (e.g., ["vitals_labs", "interventions"]). code_mapping: a dictionary containing the code mapping information. The key is a str of the source code vocabulary and the value is of two formats: (1) a str of the target code vocabulary; (2) a tuple with two elements. The first element is a str of the target code vocabulary and the second element is a dict with keys "source_kwargs" or "target_kwargs" and values of the corresponding kwargs for the `CrossMap.map()` method. Default is empty dict, which means the original code will be used. dev: whether to enable dev mode (only use a small subset of the data). Default is False. refresh_cache: whether to refresh the cache; if true, the dataset will be processed from scratch and the cache will be updated. Default is False. pop_size: If your MIMIC-Extract dataset was created with a pop_size parameter, include it here. This is used to find the correct filenames. itemid_to_variable_map: Path to the CSV file used for aggregation mapping during your dataset's creation. Probably the one located in the MIMIC-Extract repo at `resources/itemid_to_variable_map.csv`, or your own version if you have customized it. Attributes: task: Optional[str], name of the task (e.g., "mortality prediction"). Default is None. samples: Optional[List[Dict]], a list of samples, each sample is a dict with patient_id, visit_id, and other task-specific attributes as key. Default is None. patient_to_index: Optional[Dict[str, List[int]]], a dict mapping patient_id to a list of sample indices. Default is None. visit_to_index: Optional[Dict[str, List[int]]], a dict mapping visit_id to a list of sample indices. Default is None. Examples: >>> from pyhealth.datasets import MIMICExtractDataset >>> dataset = MIMICExtractDataset( ... root="/srv/local/data/physionet.org/files/mimiciii/1.4", ... tables=["DIAGNOSES_ICD", "NOTES"], TODO: What here? ... code_mapping={"NDC": ("ATC", {"target_kwargs": {"level": 3}})}, ... ) >>> dataset.stat() >>> dataset.info() """ def __init__( self, root: str, tables: List[str], dataset_name: Optional[str] = None, code_mapping: Optional[Dict[str, Union[str, Tuple[str, Dict]]]] = None, dev: bool = False, refresh_cache: bool = False, pop_size: Optional[int] = None, itemid_to_variable_map: Optional[str] = None, #is_icustay_visit: Optional[bool] = False #TODO: implement fully ): if pop_size is not None: self._fname_suffix = f"_{pop_size}" else: self._fname_suffix = "" self._ahd_filename = os.path.join(root, f"all_hourly_data{self._fname_suffix}.h5") self._c_filename = os.path.join(root, f"C{self._fname_suffix}.h5") self._notes_filename = os.path.join(root, f"all_hourly_data{self._fname_suffix}.hdf") self._v_id_column = 'hadm_id' #'icustay_id' if is_icustay_visit else 'hadm_id' # This could be implemented with MedCode.CrossMap, however part of the idea behind # MIMIC-Extract is that the user can customize this mapping--therefore we will # make a map specific to this dataset instance based on a possibly-customized CSV. self._vocab_map = { "chartevents": {}, "labevents": {}, "vitals_labs": {} } if itemid_to_variable_map is not None: # We are just going to read some metadata here... df_ahd = pd.read_hdf(self._ahd_filename, 'vitals_labs') grptype = "LEVEL1" if "LEVEL1" in df_ahd.columns.names else "LEVEL2" itemid_map = pd.read_csv(itemid_to_variable_map) for linksto, dict in self._vocab_map.items(): df = itemid_map if linksto != 'vitals_labs': df = df[df["LINKSTO"] == linksto] # Pick the most common ITEMID to use for our vocabulary... df = df.sort_values(by="COUNT", ascending=False).groupby(grptype).head(1) df = df[[grptype,"ITEMID"]].set_index([grptype]) #TODO: Probably a better way than iterrows? At least this is a small df. #self._vocab_map[linksto] = df[["ITEMID"]].to_dict(orient="index") for r in df.iterrows(): self._vocab_map[linksto][r[0].lower()] = r[1]["ITEMID"] # reverse engineered from mimic-code concepts SQL and MIMIC-Extractt SQL... self._vocab_map['interventions'] = { 'vent': 467, 'adenosine': 4649, 'dobutamine': 30042, 'dopamine': 30043, 'epinephrine': 30044, 'isuprel': 30046, 'milrinone': 30125, 'norepinephrine': 30047, 'phenylephrine': 30127, 'vasopressin': 30051, 'colloid_bolus': 46729, # "Dextran" Arbitrary! No general itemid! 'crystalloid_bolus': 41491, # "fluid bolus" 'nivdurations': 468 } super().__init__(root=root, tables=tables, dataset_name=dataset_name, code_mapping=code_mapping, dev=dev, refresh_cache=refresh_cache)
[docs] def parse_basic_info(self, patients: Dict[str, Patient]) -> Dict[str, Patient]: """Helper function which parses `patients` dataset (within `all_hourly_data.h5`) Will be called in `self.parse_tables()` Docs: - PATIENTS: https://mimic.mit.edu/docs/iii/tables/patients/ - ADMISSIONS: https://mimic.mit.edu/docs/iii/tables/admissions/ Args: patients: a dict of `Patient` objects indexed by patient_id which is updated with the mimic-3 table result. Returns: The updated patients dict. """ # read patients table patients_df = pd.read_hdf(self._ahd_filename, 'patients') # sort by admission and discharge time df = patients_df.reset_index().sort_values(["subject_id", "admittime", "dischtime"], ascending=True) # group by patient #TODO: This can probably be simplified--MIMIC-Extract includes only the first ICU # visit for each patient (see paper)... it is unclear whether it might be easily # modified to include multiple visits however, so this may have value for customised # versions of the pipeline. df_group = df.groupby("subject_id") # parallel unit of basic information (per patient) def basic_unit(p_id, p_info): #FIXME: This is insanity. #tdelta = pd.Timedelta(days=365.2425*p_info["age"].values[0]) # pd.Timedelta cannot handle 300-year deltas! tdeltahalf = pd.Timedelta(days=0.5*365.2425*p_info["age"].values[0]) patient = Patient( patient_id=p_id, birth_datetime=pd.to_datetime(p_info["admittime"].values[0]-tdeltahalf-tdeltahalf), #see? death_datetime=p_info["deathtime"].values[0], gender=p_info["gender"].values[0], ethnicity=p_info["ethnicity"].values[0], ) # load visits for v_id, v_info in p_info.groupby("hadm_id"): visit = Visit( visit_id=v_id, patient_id=p_id, encounter_time=pd.to_datetime(v_info["admittime"].values[0]), discharge_time=pd.to_datetime(v_info["dischtime"].values[0]), discharge_status=v_info["hospital_expire_flag"].values[0], ) # add visit patient.add_visit(visit) return patient # parallel apply df_group = df_group.parallel_apply( lambda x: basic_unit(x.subject_id.unique()[0], x) ) # summarize the results for pat_id, pat in df_group.items(): patients[pat_id] = pat return patients
[docs] def parse_diagnoses_icd(self, patients: Dict[str, Patient]) -> Dict[str, Patient]: """Helper function which parses the `C` (ICD9 diagnosis codes) dataset (within `C.h5`) in a way compatible with MIMIC3Dataset. Will be called in `self.parse_tables()` Docs: - DIAGNOSES_ICD: https://mimic.mit.edu/docs/iii/tables/diagnoses_icd/ Args: patients: a dict of `Patient` objects indexed by patient_id. Returns: The updated patients dict. Note: MIMIC-III does not provide specific timestamps in DIAGNOSES_ICD table, so we set it to None. """ return self._parse_c(patients, table='DIAGNOSES_ICD')
[docs] def parse_c(self, patients: Dict[str, Patient]) -> Dict[str, Patient]: """Helper function which parses the `C` (ICD9 diagnosis codes) dataset (within `C.h5`). Will be called in `self.parse_tables()` Docs: - DIAGNOSES_ICD: https://mimic.mit.edu/docs/iii/tables/diagnoses_icd/ Args: patients: a dict of `Patient` objects indexed by patient_id. Returns: The updated patients dict. Note: MIMIC-III does not provide specific timestamps in DIAGNOSES_ICD table, so we set it to None. """ return self._parse_c(patients, table='C')
def _parse_c(self, patients: Dict[str, Patient], table: str = 'C') -> Dict[str, Patient]: # read table df = pd.read_hdf(self._c_filename, 'C') # drop records of the other patients df = df.loc[(list(patients.keys()),slice(None),slice(None)),:] # drop rows with missing values #df = df.dropna(subset=["subject_id", "hadm_id", "icd9_codes"]) dfgroup = df.reset_index().groupby("subject_id") #display(df) #df = df.reset_index(['icustay_id']) #drops this one only.. interesting #display(df) captured_v_id_column = self._v_id_column def diagnosis_unit(p_id, p_info): events = [] for v_id, v_info in p_info.groupby(captured_v_id_column): codes = set(v_info['icd9_codes'].sum()) for code in codes: event = Event( code=code, table=table, vocabulary="ICD9CM", visit_id=v_id, patient_id=p_id, ) events.append(event) return events # parallel apply dfgroup = dfgroup.parallel_apply( #dfgroup = dfgroup.apply( lambda x: diagnosis_unit(x.subject_id.unique()[0], x) ) # summarize the results patients = self._add_events_to_patient_dict(patients, dfgroup) return patients
[docs] def parse_labevents(self, patients: Dict[str, Patient]) -> Dict[str, Patient]: """Helper function which parses the `vitals_labs` dataset (within `all_hourly_data.h5`) in a way compatible with MIMIC3Dataset. Features in `vitals_labs` are corellated with MIMIC-III ITEM_ID values, and those ITEM_IDs that correspond to LABEVENTS table items in raw MIMIC-III will be added as events. This corellation depends on the contents of the provided `itemid_to_variable_map.csv` file. Note that this will likely *not* match the raw MIMIC-III data because of the harmonization/aggregation done by MIMIC-Extract. See also `self.parse_vitals_labs()` Will be called in `self.parse_tables()` Docs: - LABEVENTS: https://mimic.mit.edu/docs/iii/tables/labevents/ Args: patients: a dict of `Patient` objects indexed by patient_id. Returns: The updated patients dict. """ table = "LABEVENTS" return self._parse_vitals_labs(patients=patients, table=table)
[docs] def parse_chartevents(self, patients: Dict[str, Patient]) -> Dict[str, Patient]: """Helper function which parses the `vitals_labs` dataset (within `all_hourly_data.h5`) in a way compatible with MIMIC3Dataset. Features in `vitals_labs` are corellated with MIMIC-III ITEM_ID values, and those ITEM_IDs that correspond to CHARTEVENTS table items in raw MIMIC-III will be added as events. This corellation depends on the contents of the provided `itemid_to_variable_map.csv` file. Note that this will likely *not* match the raw MIMIC-III data because of the harmonization/aggregation done in MIMIC-Extract. Will be called in `self.parse_tables()` Docs: - CHARTEVENTS: https://mimic.mit.edu/docs/iii/tables/chartevents/ Args: patients: a dict of `Patient` objects indexed by patient_id. Returns: The updated patients dict. """ table = "CHARTEVENTS" return self._parse_vitals_labs(patients=patients, table=table)
[docs] def parse_vitals_labs(self, patients: Dict[str, Patient]) -> Dict[str, Patient]: """Helper function which parses the `vitals_labs` dataset (within `all_hourly_data.h5`). Events are added using the `MIMIC3_ITEMID` vocabulary, and the mapping is determined by the CSV file passed to the constructor in `itemid_to_variable_map`. Since MIMIC-Extract aggregates like events, only a single MIMIC-III ITEMID will be used to represent all like items in the MIMIC-Extract dataset--so the data here will likely *not* match raw MIMIC-III data. Which ITEMIDs are used depends on the aggregation level in your dataset (i.e. whether you used `--no_group_by_level2`). Will be called in `self.parse_tables()` See also `self.parse_chartevents()` and `self.parse_labevents()` Docs: - https://github.com/MLforHealth/MIMIC_Extract#step-4-set-cohort-selection-and-extraction-criteria Args: patients: a dict of `Patient` objects indexed by patient_id. Returns: The updated patients dict. """ table = "vitals_labs" return self._parse_vitals_labs(patients=patients, table=table)
def _parse_vitals_labs(self, patients: Dict[str, Patient], table: str = 'vitals_labs') -> Dict[str, Patient]: linksto = table.lower() # read table df = pd.read_hdf(self._ahd_filename, 'vitals_labs') # drop records of the other patients df = df.loc[(list(patients.keys()),slice(None),slice(None)),:] # parallel unit for lab (per patient) captured_v_id_column = self._v_id_column captured_vocab_map = self._vocab_map[linksto] def vl_unit(p_id, p_info): events = [] for v_id, v_info in p_info.groupby(captured_v_id_column): for e_id, e_info in v_info.iterrows(): #print(e_id) #print(f"{e_info['variable']} -> {self._vocab_map[linksto][e_info['variable']]=}") event = Event( code=captured_vocab_map[e_info['variable']], table=table, vocabulary="MIMIC3_ITEMID", visit_id=v_id, patient_id=p_id, timestamp=pd.Timestamp.to_pydatetime(e_info['timestamp']), hours_in=int(e_info['hours_in']), #level_n=e_info['variable'], #this can be reverse-looked up, so save some mem here? mean=e_info['mean'], # Value is not stored in MIMIC3Dataset... why? Unit uncertainty? #TODO: Units, somewhere? count=e_info['count'], std=e_info['std'] ) events.append(event) return events ahd_index = ["subject_id","hadm_id","icustay_id","hours_in"] df.columns = df.columns.values # Collapse MultiIndex to tuples! is_level1 = True if(len(df.columns[0]) == 5) else False # drop columns not applicable to the wanted table... if is_level1: df = df.drop(columns=[col for col in df.columns if col[2] not in self._vocab_map[linksto]]) else: df = df.drop(columns=[col for col in df.columns if col[0] not in self._vocab_map[linksto]]) # "melt" down to a per-event representation... df = df.reset_index().melt(id_vars=ahd_index).dropna() if is_level1: _,_,df['variable'],_,df['Aggregation Function'] = zip(*df['variable']) else: df['variable'],df['Aggregation Function'] = zip(*df['variable']) # Discard count == 0.0 rows df = df.loc[(df['Aggregation Function'] != 'count') | (df['value'] != 0.0)] df = df.drop_duplicates() # Manual/brute force "pivot", as I can't get pivot functions to work right with the MultiIndex columns... df = df.reset_index().sort_values(ahd_index+['variable']).set_index(ahd_index+['variable']) df_mean = df.loc[df['Aggregation Function'] == 'mean'].rename(columns={"value":"mean"})['mean'] df_count = df.loc[df['Aggregation Function'] == 'count'].rename(columns={"value":"count"})['count'] df_std = df.loc[df['Aggregation Function'] == 'std'].rename(columns={"value":"std"})['std'] if is_level1: #FIXME: Duplicates appear in the LEVEL1 representation... this is puzzling. # These should all be almost equal, or there is a significant problem. # For now, take some mean, and the highest count and std... though they # may not match. LEVEL1 representation is usually not preferred anyway. # In theory, these should probably be aggregated?? df_mean = df_mean[~df_mean.index.duplicated(keep='first')] df_count = df_count.sort_values(ascending=False) df_count = df_count[~df_count.index.duplicated(keep='first')] df_std = df_std.sort_values(ascending=False) df_std = df_std[~df_std.index.duplicated(keep='first')] df = pd.concat([df_mean, df_count, df_std], axis=1) df = df.reset_index().sort_values(ahd_index+['variable']) # reconstruct nominal timestamps for hours_in values... df_p = pd.read_hdf(self._ahd_filename, 'patients') df_p = df_p.loc[(list(patients.keys()),slice(None),slice(None)),:][['intime']] df = df.merge(df_p, on=['subject_id','hadm_id','icustay_id'], how="left") df['timestamp'] = df['intime'].dt.ceil('H')+pd.to_timedelta(df['hours_in'], unit="H") group_df = df.groupby("subject_id") # parallel apply group_df = group_df.parallel_apply( lambda x: vl_unit(x.subject_id.unique()[0], x) ) # summarize the results patients = self._add_events_to_patient_dict(patients, group_df) return patients
[docs] def parse_interventions(self, patients: Dict[str, Patient]) -> Dict[str, Patient]: """Helper function which parses the `interventions` dataset (within `all_hourly_data.h5`). Events are added using the `MIMIC3_ITEMID` vocabulary, using a manually derived mapping corresponding to general items descriptive of the intervention. Since the raw MIMIC-III data had multiple codes, and MIMIC-Extract aggregates like items, these will not match raw MIMIC-III data. In particular, note that ITEMID 41491 ("fluid bolus") is used for `crystalloid_bolus` and ITEMID 46729 ("Dextran") is used for `colloid_bolus` because there is no existing general ITEMID for colloid boluses. Will be called in `self.parse_tables()` Docs: - https://github.com/MLforHealth/MIMIC_Extract#step-4-set-cohort-selection-and-extraction-criteria Args: patients: a dict of `Patient` objects indexed by patient_id. Returns: The updated patients dict. """ table = 'interventions' linksto = table.lower() # we might put these in CHARTEVENTS also? # read table df = pd.read_hdf(self._ahd_filename, 'interventions') # drop records of the other patients df = df.loc[(list(patients.keys()),slice(None),slice(None)),:] # parallel unit for interventions (per patient) captured_v_id_column = self._v_id_column captured_vocab_map = self._vocab_map[linksto] def interv_unit(p_id, p_info): events = [] for v_id, v_info in p_info.groupby(captured_v_id_column): for e_id, e_info in v_info.iterrows(): event = Event( code=captured_vocab_map[e_info['variable']], table=table, vocabulary="MIMIC3_ITEMID", visit_id=v_id, patient_id=p_id, timestamp=pd.Timestamp.to_pydatetime(e_info['timestamp']), hours_in=int(e_info['hours_in']), intervention=e_info['variable'] ) events.append(event) return events ahd_index = ["subject_id","hadm_id","icustay_id","hours_in"] #if 'LEVEL1' in df.columns.names: # df.columns = df.columns.get_level_values(2) #else: # df.columns = df.columns.get_level_values(0) # reconstruct nominal timestamps for hours_in values... df_p = pd.read_hdf(self._ahd_filename, 'patients') df_p = df_p.loc[(list(patients.keys()),slice(None),slice(None)),:][['intime']] df = df.merge(df_p, left_on=['subject_id','hadm_id','icustay_id'], right_index=True, how="left") df['timestamp'] = df['intime'].dt.ceil('H')+pd.to_timedelta(df.index.get_level_values(3), unit="H") df = df.drop(columns=[col for col in df.columns if col not in self._vocab_map[linksto] and col not in ['timestamp']]) df = df.reset_index() df = df.melt(id_vars=ahd_index+['timestamp']) df = df[df['value'] > 0] df = df.sort_values(ahd_index) group_df = df.groupby("subject_id") # parallel apply group_df = group_df.parallel_apply( lambda x: interv_unit(x.subject_id.unique()[0], x) ) # summarize the results patients = self._add_events_to_patient_dict(patients, group_df) return patients
if __name__ == "__main__": dataset = MIMICExtractDataset( root="../mimic3demo/grouping", tables=[ #"DIAGNOSES_ICD", "C", #"LABEVENTS", #"CHARTEVENTS", "vitals_labs", "interventions", ], dev=True, refresh_cache=True, itemid_to_variable_map='../MIMIC_Extract/resources/itemid_to_variable_map.csv' ) dataset.stat() dataset.info()