pyhealth.datasets.OMOPDataset#
The OMOP Common Data Model (CDM) is an open community data standard designed to standardize the structure and content of observational health data. The OMOPDataset class provides a convenient interface for loading and working with OMOP CDM formatted databases.
We can process any OMOP-CDM formatted database. The raw data is processed into a well-structured dataset object providing flexibility and convenience for supporting modeling and analysis.
- Key Features:
Supports OMOP CDM version 5.x
Uses Polars for efficient data loading and processing
Automatic table loading with YAML configuration
Person-centric data organization
Links clinical events to visits via visit_occurrence_id
Compatible with standard OMOP vocabularies and concept IDs
Refer to the OMOP CDM documentation for more information about the data model.
- class pyhealth.datasets.OMOPDataset(root, tables, dataset_name=None, config_path=None, **kwargs)[source]#
Bases:
BaseDatasetA dataset class for handling OMOP CDM (Common Data Model) data.
The Observational Medical Outcomes Partnership (OMOP) Common Data Model (CDM) is an open community data standard, designed to standardize the structure and content of observational data.
OMOP CDM provides a standardized way to represent observational health data, enabling consistent data analysis across different healthcare systems. The CDM includes standardized vocabularies and data structures for clinical events such as visits, conditions, procedures, drug exposures, measurements, and more.
See: https://www.ohdsi.org/data-standardization/
- The default tables loaded are:
person: demographics and basic patient information
visit_occurrence: hospital/clinic visits
death: mortality information
- Additional tables can be specified via the tables parameter:
condition_occurrence: diagnoses (ICD codes)
procedure_occurrence: procedures (CPT, ICD codes)
drug_exposure: medication orders and administrations
measurement: laboratory tests and vital signs
observation: clinical observations
device_exposure: medical device usage
The person_id field is used as the patient identifier across all tables. All clinical events are linked to visits via visit_occurrence_id.
- Parameters:
root (str) – The root directory where the OMOP dataset CSV files are stored.
tables (List[str]) – A list of additional tables to include beyond the default tables (person, visit_occurrence, death).
dataset_name (Optional[str]) – The name of the dataset. Defaults to “omop”.
config_path (Optional[str]) – The path to the YAML configuration file defining table schemas. If not provided, uses the default OMOP config.
dev (bool) – Whether to enable dev mode (only use first 1000 patients for faster testing). Default is False.
refresh_cache (bool) – Whether to refresh the cache; if true, the dataset will be processed from scratch. Default is False.
- task#
Optional[str], name of the task (e.g., “mortality prediction”). Default is None.
- samples#
Optional[List[Dict]], a list of samples, each sample is a dict with patient_id, visit_id, and other task-specific attributes as key. Default is None.
- patient_to_index#
Optional[Dict[str, List[int]]], a dict mapping patient_id to a list of sample indices. Default is None.
- visit_to_index#
Optional[Dict[str, List[int]]], a dict mapping visit_id to a list of sample indices. Default is None.
Examples
>>> from pyhealth.datasets import OMOPDataset >>> from pyhealth.tasks import MortalityPredictionOMOP >>> >>> # Load OMOP dataset with clinical tables >>> dataset = OMOPDataset( ... root="/path/to/omop/data", ... tables=["condition_occurrence", "procedure_occurrence", ... "drug_exposure"], ... dev=False, ... ) >>> dataset.stat() >>> dataset.info() >>> >>> # Access patient data >>> patient = dataset.get_patient("123") >>> print(f"Patient has {len(patient.data_source)} events") >>> >>> # Get events by type >>> visits = patient.get_events(event_type="visit_occurrence") >>> conditions = patient.get_events( ... event_type="condition_occurrence" ... ) >>> >>> # Filter events by visit >>> visit = visits[0] >>> visit_conditions = patient.get_events( ... event_type="condition_occurrence", ... filters=[("visit_occurrence_id", "==", ... visit.visit_occurrence_id)] ... ) >>> >>> # Create task-specific samples >>> mortality_task = MortalityPredictionOMOP() >>> sample_dataset = dataset.set_task(task=mortality_task) >>> print(f"Generated {len(sample_dataset)} samples")
- preprocess_person(df)[source]#
Preprocesses the person table by constructing birth_datetime.
Concatenates year_of_birth, month_of_birth, and day_of_birth into a single birth_datetime field. Missing month/day default to 01.
- Parameters:
df (pl.LazyFrame) – The input dataframe containing person data.
- Returns:
The processed dataframe with birth_datetime.
- Return type:
pl.LazyFrame
- create_tmpdir()#
Creates and returns a new temporary directory within the cache.
- Returns:
The path to the new temporary directory.
- Return type:
- property default_task: Optional[BaseTask]#
Returns the default task for the dataset.
- Returns:
The default task, if any.
- Return type:
Optional[BaseTask]
- get_patient(patient_id)#
Retrieves a Patient object for the given patient ID.
- Parameters:
patient_id (str) – The ID of the patient to retrieve.
- Returns:
The Patient object for the given ID.
- Return type:
- Raises:
AssertionError – If the patient ID is not found in the dataset.
- property global_event_df: LazyFrame#
Returns the path to the cached event dataframe.
- Returns:
The path to the cached event dataframe.
- Return type:
- iter_patients(df=None)#
Yields Patient objects for each unique patient in the dataset.
- load_data()#
Loads data from the specified tables.
- Returns:
A concatenated lazy frame of all tables.
- Return type:
dd.DataFrame
- load_table(table_name)#
Loads a table and processes joins if specified.
- Parameters:
table_name (str) – The name of the table to load.
- Returns:
The processed Dask dataframe for the table.
- Return type:
dd.DataFrame
- Raises:
ValueError – If the table is not found in the config.
FileNotFoundError – If the CSV file for the table or join is not found.
- set_task(task=None, num_workers=None, input_processors=None, output_processors=None)#
Processes the base dataset to generate the task-specific sample dataset. The cache structure is as follows:
{task_name}_{task_uuid}/ # Cached data for specific task based on task name, schema, and args task_df.ld/ # Intermediate task dataframe based on schema samples_{proc_uuid}.ld/ # Final processed samples after applying processors schema.pkl # Saved SampleBuilder schema *.bin # Processed sample files
- Parameters:
task (Optional[BaseTask]) – The task to set. Uses default task if None.
num_workers (int) – Number of workers for multi-threading. Default is self.num_workers.
input_processors (Optional[Dict[str, FeatureProcessor]]) – Pre-fitted input processors. If provided, these will be used instead of creating new ones from task’s input_schema. Defaults to None.
output_processors (Optional[Dict[str, FeatureProcessor]]) – Pre-fitted output processors. If provided, these will be used instead of creating new ones from task’s output_schema. Defaults to None.
- Returns:
The generated sample dataset.
- Return type:
- Raises:
AssertionError – If no default task is found and task is None.