pyhealth.datasets.FHIRDataset#
A generic, config-driven NDJSON ingest for HL7 FHIR datasets. The whole pipeline is described by a single YAML config with three top-level sections — what files to read, how to turn each FHIR resource into a flat row, and how those rows appear as events downstream. A custom FHIR ingest is “point at a YAML” — no Python required.
The bundled MIMIC4FHIR subclass uses this engine
with the pyhealth/datasets/fhir/configs/mimic4fhir.yaml config tuned for
PhysioNet’s MIMIC-IV on FHIR export. See the sub-page below for the quick-start.
Quick start#
from pyhealth.datasets import MIMIC4FHIR, get_dataloader, split_by_patient
from pyhealth.tasks.mpf_clinical_prediction import MPFClinicalPredictionTask
from pyhealth.models import EHRMambaCEHR
from pyhealth.trainer import Trainer
def main():
ds = MIMIC4FHIR(root="/data/mimic-iv-fhir")
sample_ds = ds.set_task(MPFClinicalPredictionTask(), num_workers=1)
train, val, test = split_by_patient(sample_ds, [0.7, 0.1, 0.2])
vocab_size = sample_ds.input_processors["concept_ids"].vocab.vocab_size
model = EHRMambaCEHR(dataset=sample_ds, vocab_size=vocab_size)
Trainer(model=model).train(
train_dataloader=get_dataloader(train, batch_size=8, shuffle=True),
val_dataloader=get_dataloader(val, batch_size=8),
epochs=2,
)
if __name__ == "__main__":
main()
(if __name__ == "__main__": matters — set_task()
forks Dask workers; without the guard the workers re-import and re-spawn.)
Pipeline at a glance#
NDJSON shards on disk
|
| (Phase A) — stream line by line, route by resourceType,
| project via the YAML's resource_specs
v
flattened_tables/<table>.parquet <- cache #1
|
| (Phase B) — load_table, dd.concat, sort by patient_id (Dask)
v
global_event_df.parquet/part-*.parquet <- cache #2
|
| (Phase C) — task_transform per-patient sample emit
v
task_df.ld/ <- cache #3a
|
| fit CehrProcessor vocab via SampleBuilder.fit(dataset)
| proc_transform per-sample tensorisation
v
samples_*.ld/ <- cache #3b ──> SampleDataset
Each of the three cache tiers has its own existence check; re-running with
identical inputs skips every phase. Cache identity hashes the YAML byte digest,
glob patterns, max_patients, and engine schema version — any meaningful
config change invalidates everything below it. See
BaseDataset for the Phase B/C internals that are
shared with all other PyHealth datasets.
The unified YAML config#
A FHIR ingest YAML has three top-level sections. The bundled
mimic4fhir.yaml is the canonical worked example; what follows is the
section-by-section reference.
Section 1: glob_patterns: (which files to read)#
glob_patterns:
- "**/MimicPatient*.ndjson.gz"
- "**/MimicEncounter*.ndjson.gz"
# ... one pattern per resource-type shard family
Defaults to ["**/*.ndjson.gz"] when omitted. Only worth setting when your
export has a per-resource-type file-naming convention you want to exploit for
speed — PhysioNet MIMIC-IV FHIR ships shards as MimicPatient*.ndjson.gz,
MimicEncounter*.ndjson.gz, etc., and filtering at the file level avoids
decompressing ~10% of the export that contains only unconfigured resource
types. For a generic export where everything is in bundles.ndjson.gz, omit
this block and the streamer will filter by resourceType after parsing.
Override at runtime via MIMIC4FHIR(glob_pattern=...) or
MIMIC4FHIR(glob_patterns=[...]).
Section 2: resource_specs: (how to project JSON into rows)#
Keys are FHIR resourceType strings. For each, declare a table name and
an ordered columns mapping:
resource_specs:
Patient:
table: patient
columns:
patient_id: { locate: ["id"], required: true }
birth_date: { locate: ["birthDate"] }
gender: { locate: ["gender"] }
deceased_boolean: { locate: ["deceasedBoolean"], transform: bool_norm }
Observation:
table: observation
columns:
patient_id: { locate: ["subject.reference"], transform: ref_id, required: true }
resource_id: { locate: ["id"] }
encounter_id: { locate: ["encounter.reference"], transform: ref_id }
event_time: { locate: ["effectiveDateTime", "effectivePeriod.start", "issued"] }
concept_key: { locate: ["code"], transform: coding_key }
Each column entry has three fields:
locate(required, list of dotted paths)Ordered JSON paths into the resource; the first that resolves to a non-null value wins. This is how FHIR choice-types (
onset[x],effective[x],performed[x], …) are handled — list every variant explicitly. A single string is accepted as shorthand for a one-element list.transform(optional, name of a built-in transform, default ``identity``)Maps the located leaf to a flat scalar string. See the registry below.
required(optional, bool, default false)When
true, a resource whoselocatecannot be resolved is dropped (and logged) rather than emitted with a null. Use this on the patient reference column so events without a discoverable patient never reach the global event frame.
Transform registry#
Available transforms (defined in
pyhealth/datasets/fhir/utils.py TRANSFORMS dict):
|
Pass the value through. Stringifies non-string scalars. |
|
Reference object or |
|
CodeableConcept -> |
|
JSON boolean / |
|
MedicationRequest medication[x] -> codeable-concept or
|
Adding a new transform is a one-liner: register a callable in TRANSFORMS
in utils.py and reference it by name from the YAML.
Section 3: tables: (how rows are exposed as events)#
Keys here must match the table: values from Section 2. Each entry tells
load_table() how to read the flat parquet:
tables:
patient:
file_path: "patient.parquet"
patient_id: "patient_id"
timestamp: "birth_date"
attributes: ["birth_date", "gender", "deceased_boolean"]
observation:
file_path: "observation.parquet"
patient_id: "patient_id"
timestamp: "event_time"
attributes: ["resource_id", "encounter_id", "event_time", "concept_key"]
file_path is the parquet filename inside the cached
flattened_tables/ directory. patient_id and timestamp name the
columns to surface as the normalised patient_id and timestamp on each
event. attributes is the list of columns surfaced as event attributes — in
the global event frame they’re renamed to {table}/{attr} and later show up
on patient.get_events(event_type=...).attr_name.
Cross-section validation#
At load time the dataset checks that every table: value declared in
Section 2 has a matching tables.<name> block in Section 3. Typos surface
as a config error at startup, not silent empty parquets.
Customising for a non-MIMIC FHIR export#
Step 1 — write your YAML.#
Copy pyhealth/datasets/fhir/configs/mimic4fhir.yaml and adapt the
resource_specs: and tables: blocks for the resources you care about.
For an export that adds Immunizations:
resource_specs:
Patient:
table: patient
columns:
patient_id: { locate: ["id"], required: true }
birth_date: { locate: ["birthDate"] }
Immunization:
table: immunization
columns:
patient_id: { locate: ["patient.reference"], transform: ref_id, required: true }
resource_id: { locate: ["id"] }
event_time: { locate: ["occurrenceDateTime", "recorded"] }
concept_key: { locate: ["vaccineCode"], transform: coding_key }
tables:
patient:
file_path: "patient.parquet"
patient_id: "patient_id"
timestamp: "birth_date"
attributes: ["birth_date"]
immunization:
file_path: "immunization.parquet"
patient_id: "patient_id"
timestamp: "event_time"
attributes: ["resource_id", "event_time", "concept_key"]
Step 2 — instantiate#
Either pass config_path=... directly:
from pyhealth.datasets import FHIRDataset
ds = FHIRDataset(
root="/data/my_fhir_export",
config_path="/path/to/my_export.yaml",
)
or write a 3-line subclass that bundles your config:
from pyhealth.datasets import FHIRDataset
class MyFHIR(FHIRDataset):
DEFAULT_CONFIG_PATH = "/path/to/my_export.yaml"
ds = MyFHIR(root="/data/my_fhir_export")
Step 3 — that’s it.#
Everything downstream — set_task(),
iter_patients(),
get_patient() — works the same as for any
other PyHealth dataset.
Notes on resource use#
Streaming ingest avoids loading the whole NDJSON corpus into RAM, but downstream
steps still scale with cohort size. For a smoke run the bundled example
fixtures fit on any laptop. For a laptop-scale real subset, set
max_patients= and/or narrow glob_patterns to keep cache and task passes
manageable; ≥16 GB system RAM is a comfort target for Polars + the trainer.
For the full PhysioNet export, prefer fast SSD, large disk, and plenty of
RAM — total work scales with the corpus size even if RAM ingest is bounded.
Bundled FHIR datasets#
API reference#
- class pyhealth.datasets.FHIRDataset(root, config_path=None, glob_pattern=None, glob_patterns=None, output_format=None, max_patients=None, ingest_num_shards=None, cache_dir=None, num_workers=1, dev=False)[source]#
Bases:
BaseDatasetFHIR resources flattened into per-type tables, then the standard pipeline.
Streams raw FHIR NDJSON/NDJSON.GZ exports into flattened tables (one per configured resource type) and pipelines them through
BaseDatasetfor downstream task processing (global event dataframe, patient iteration, task sampling).The entire ingest is driven by a single YAML config with three top-level sections —
glob_patterns:(which NDJSON files to open),resource_specs:(how to project each FHIR resource type into a flat row), andtables:(how those rows are exposed as events downstream). Seepyhealth/datasets/fhir/configs/mimic4fhir.yamlfor a complete worked example and the FHIRDataset rst page for a section-by-section guide.Pass
config_path=...directly, or subclass and setDEFAULT_CONFIG_PATHto bundle a default (seeMIMIC4FHIR).- Parameters:
root (
str) – Path to the NDJSON/NDJSON.GZ export directory.config_path (
Optional[str]) – Path to the FHIR ingest YAML. Defaults to the class attributeDEFAULT_CONFIG_PATH. The YAML must contain aresource_specs:block; anyglob_patterns:andtables:blocks are also read from here.glob_pattern (
Optional[str]) – Single glob for NDJSON files; overrides the YAML’sglob_patterns. Mutually exclusive with glob_patterns.glob_patterns (
Optional[Sequence[str]]) – Multiple glob patterns; overrides the YAML’sglob_patterns. Mutually exclusive with glob_pattern.output_format (
Optional[str]) – Flat-table format, one ofparquet(default),csv,tsv. Defaults to the class attributeDEFAULT_OUTPUT_FORMAT.max_patients (
Optional[int]) – Limit ingest to the first N unique patient IDs.ingest_num_shards (
Optional[int]) – Ignored; retained for API compatibility.cache_dir (
Union[str,Path,None]) – Cache directory root (UUID subdir appended per config).num_workers (
int) – Worker processes for task sampling.dev (
bool) – Development mode; limits to 1000 patients if max_patients isNone.
Examples
>>> # ad-hoc, no subclass >>> ds = FHIRDataset( ... root="/data/fhir", ... config_path="my_fhir.yaml", ... ) >>> # or a preconfigured source subclass >>> from pyhealth.datasets import MIMIC4FHIR >>> ds = MIMIC4FHIR(root="/data/mimic-iv-fhir", max_patients=500)
- DEFAULT_CONFIG_PATH: Optional[str] = None#
Default ingest YAML path; set by source subclasses to bundle a config.
- create_tmpdir()#
Creates and returns a new temporary directory within the cache.
- Returns:
The path to the new temporary directory.
- Return type:
- property default_task: Optional[BaseTask]#
Returns the default task for the dataset.
- Returns:
The default task, if any.
- Return type:
Optional[BaseTask]
- get_patient(patient_id)#
Retrieves a Patient object for the given patient ID.
- Parameters:
patient_id (str) – The ID of the patient to retrieve.
- Returns:
The Patient object for the given ID.
- Return type:
- Raises:
AssertionError – If the patient ID is not found in the dataset.
- property global_event_df: LazyFrame#
Returns the path to the cached event dataframe.
- Returns:
The path to the cached event dataframe.
- Return type:
- iter_patients(df=None)#
Yields Patient objects for each unique patient in the dataset.
- load_data()#
Loads data from the specified tables.
- Returns:
A concatenated lazy frame of all tables.
- Return type:
dd.DataFrame
- set_task(task=None, num_workers=None, input_processors=None, output_processors=None)#
Processes the base dataset to generate the task-specific sample dataset. The cache structure is as follows:
{task_name}_{task_uuid}/ # Cached data for specific task based on task name, schema, and args task_df.ld/ # Intermediate task dataframe based on schema samples_{proc_uuid}.ld/ # Final processed samples after applying processors schema.pkl # Saved SampleBuilder schema *.bin # Processed sample files
- Parameters:
task (Optional[BaseTask]) – The task to set. Uses default task if None.
num_workers (int) – Number of workers for multi-threading. Default is self.num_workers.
input_processors (Optional[Dict[str, FeatureProcessor]]) – Pre-fitted input processors. If provided, these will be used instead of creating new ones from task’s input_schema. Defaults to None.
output_processors (Optional[Dict[str, FeatureProcessor]]) – Pre-fitted output processors. If provided, these will be used instead of creating new ones from task’s output_schema. Defaults to None.
- Returns:
The generated sample dataset.
- Return type:
- Raises:
AssertionError – If no default task is found and task is None.
- load_table(table_name)[source]#
Load one flattened table into the standard event schema.
Deviations from
BaseDataset.load_table(CSV via_scan_csv_tsv_gz):Reads pre-built flat tables (parquet/csv/tsv) under
prepared_tables_dir.Timestamp parsing uses
errors="coerce"+utc=True(FHIR ISO strings include timezone suffix or partial dates).Strips tz-aware timestamps to naive UTC for Dask compat.
Drops rows with null
patient_idbefore returning.
- Return type:
DataFrame