pyhealth.datasets.FHIRDataset#

A generic, config-driven NDJSON ingest for HL7 FHIR datasets. The whole pipeline is described by a single YAML config with three top-level sections — what files to read, how to turn each FHIR resource into a flat row, and how those rows appear as events downstream. A custom FHIR ingest is “point at a YAML” — no Python required.

The bundled MIMIC4FHIR subclass uses this engine with the pyhealth/datasets/fhir/configs/mimic4fhir.yaml config tuned for PhysioNet’s MIMIC-IV on FHIR export. See the sub-page below for the quick-start.

Quick start#

from pyhealth.datasets import MIMIC4FHIR, get_dataloader, split_by_patient
from pyhealth.tasks.mpf_clinical_prediction import MPFClinicalPredictionTask
from pyhealth.models import EHRMambaCEHR
from pyhealth.trainer import Trainer

def main():
    ds = MIMIC4FHIR(root="/data/mimic-iv-fhir")
    sample_ds = ds.set_task(MPFClinicalPredictionTask(), num_workers=1)
    train, val, test = split_by_patient(sample_ds, [0.7, 0.1, 0.2])
    vocab_size = sample_ds.input_processors["concept_ids"].vocab.vocab_size
    model = EHRMambaCEHR(dataset=sample_ds, vocab_size=vocab_size)
    Trainer(model=model).train(
        train_dataloader=get_dataloader(train, batch_size=8, shuffle=True),
        val_dataloader=get_dataloader(val, batch_size=8),
        epochs=2,
    )

if __name__ == "__main__":
    main()

(if __name__ == "__main__": matters — set_task() forks Dask workers; without the guard the workers re-import and re-spawn.)

Pipeline at a glance#

NDJSON shards on disk
    |
    |  (Phase A) — stream line by line, route by resourceType,
    |             project via the YAML's resource_specs
    v
flattened_tables/<table>.parquet         <- cache #1
    |
    |  (Phase B) — load_table, dd.concat, sort by patient_id (Dask)
    v
global_event_df.parquet/part-*.parquet   <- cache #2
    |
    |  (Phase C) — task_transform per-patient sample emit
    v
task_df.ld/        <- cache #3a
    |
    |  fit CehrProcessor vocab via SampleBuilder.fit(dataset)
    |  proc_transform per-sample tensorisation
    v
samples_*.ld/      <- cache #3b   ──>   SampleDataset

Each of the three cache tiers has its own existence check; re-running with identical inputs skips every phase. Cache identity hashes the YAML byte digest, glob patterns, max_patients, and engine schema version — any meaningful config change invalidates everything below it. See BaseDataset for the Phase B/C internals that are shared with all other PyHealth datasets.

The unified YAML config#

A FHIR ingest YAML has three top-level sections. The bundled mimic4fhir.yaml is the canonical worked example; what follows is the section-by-section reference.

Section 1: glob_patterns: (which files to read)#

glob_patterns:
  - "**/MimicPatient*.ndjson.gz"
  - "**/MimicEncounter*.ndjson.gz"
  # ... one pattern per resource-type shard family

Defaults to ["**/*.ndjson.gz"] when omitted. Only worth setting when your export has a per-resource-type file-naming convention you want to exploit for speed — PhysioNet MIMIC-IV FHIR ships shards as MimicPatient*.ndjson.gz, MimicEncounter*.ndjson.gz, etc., and filtering at the file level avoids decompressing ~10% of the export that contains only unconfigured resource types. For a generic export where everything is in bundles.ndjson.gz, omit this block and the streamer will filter by resourceType after parsing.

Override at runtime via MIMIC4FHIR(glob_pattern=...) or MIMIC4FHIR(glob_patterns=[...]).

Section 2: resource_specs: (how to project JSON into rows)#

Keys are FHIR resourceType strings. For each, declare a table name and an ordered columns mapping:

resource_specs:

  Patient:
    table: patient
    columns:
      patient_id:        { locate: ["id"], required: true }
      birth_date:        { locate: ["birthDate"] }
      gender:            { locate: ["gender"] }
      deceased_boolean:  { locate: ["deceasedBoolean"], transform: bool_norm }

  Observation:
    table: observation
    columns:
      patient_id:    { locate: ["subject.reference"], transform: ref_id, required: true }
      resource_id:   { locate: ["id"] }
      encounter_id:  { locate: ["encounter.reference"], transform: ref_id }
      event_time:    { locate: ["effectiveDateTime", "effectivePeriod.start", "issued"] }
      concept_key:   { locate: ["code"], transform: coding_key }

Each column entry has three fields:

locate (required, list of dotted paths)

Ordered JSON paths into the resource; the first that resolves to a non-null value wins. This is how FHIR choice-types (onset[x], effective[x], performed[x], …) are handled — list every variant explicitly. A single string is accepted as shorthand for a one-element list.

transform (optional, name of a built-in transform, default ``identity``)

Maps the located leaf to a flat scalar string. See the registry below.

required (optional, bool, default false)

When true, a resource whose locate cannot be resolved is dropped (and logged) rather than emitted with a null. Use this on the patient reference column so events without a discoverable patient never reach the global event frame.

Transform registry#

Available transforms (defined in pyhealth/datasets/fhir/utils.py TRANSFORMS dict):

identity

Pass the value through. Stringifies non-string scalars.

ref_id

Reference object or "Patient/p1" -> "p1".

coding_key

CodeableConcept -> "system|code" of its first coding.

bool_norm

JSON boolean / "true"/"false" -> "true"/"false"/None.

med_concept

MedicationRequest medication[x] -> codeable-concept or "MedicationRequest/reference|<id>" fallback.

Adding a new transform is a one-liner: register a callable in TRANSFORMS in utils.py and reference it by name from the YAML.

Section 3: tables: (how rows are exposed as events)#

Keys here must match the table: values from Section 2. Each entry tells load_table() how to read the flat parquet:

tables:
  patient:
    file_path: "patient.parquet"
    patient_id: "patient_id"
    timestamp: "birth_date"
    attributes: ["birth_date", "gender", "deceased_boolean"]

  observation:
    file_path: "observation.parquet"
    patient_id: "patient_id"
    timestamp: "event_time"
    attributes: ["resource_id", "encounter_id", "event_time", "concept_key"]

file_path is the parquet filename inside the cached flattened_tables/ directory. patient_id and timestamp name the columns to surface as the normalised patient_id and timestamp on each event. attributes is the list of columns surfaced as event attributes — in the global event frame they’re renamed to {table}/{attr} and later show up on patient.get_events(event_type=...).attr_name.

Cross-section validation#

At load time the dataset checks that every table: value declared in Section 2 has a matching tables.<name> block in Section 3. Typos surface as a config error at startup, not silent empty parquets.

Customising for a non-MIMIC FHIR export#

Step 1 — write your YAML.#

Copy pyhealth/datasets/fhir/configs/mimic4fhir.yaml and adapt the resource_specs: and tables: blocks for the resources you care about. For an export that adds Immunizations:

resource_specs:
  Patient:
    table: patient
    columns:
      patient_id: { locate: ["id"], required: true }
      birth_date: { locate: ["birthDate"] }
  Immunization:
    table: immunization
    columns:
      patient_id:   { locate: ["patient.reference"], transform: ref_id, required: true }
      resource_id:  { locate: ["id"] }
      event_time:   { locate: ["occurrenceDateTime", "recorded"] }
      concept_key:  { locate: ["vaccineCode"], transform: coding_key }

tables:
  patient:
    file_path: "patient.parquet"
    patient_id: "patient_id"
    timestamp: "birth_date"
    attributes: ["birth_date"]
  immunization:
    file_path: "immunization.parquet"
    patient_id: "patient_id"
    timestamp: "event_time"
    attributes: ["resource_id", "event_time", "concept_key"]

Step 2 — instantiate#

Either pass config_path=... directly:

from pyhealth.datasets import FHIRDataset

ds = FHIRDataset(
    root="/data/my_fhir_export",
    config_path="/path/to/my_export.yaml",
)

or write a 3-line subclass that bundles your config:

from pyhealth.datasets import FHIRDataset

class MyFHIR(FHIRDataset):
    DEFAULT_CONFIG_PATH = "/path/to/my_export.yaml"

ds = MyFHIR(root="/data/my_fhir_export")

Step 3 — that’s it.#

Everything downstream — set_task(), iter_patients(), get_patient() — works the same as for any other PyHealth dataset.

Notes on resource use#

Streaming ingest avoids loading the whole NDJSON corpus into RAM, but downstream steps still scale with cohort size. For a smoke run the bundled example fixtures fit on any laptop. For a laptop-scale real subset, set max_patients= and/or narrow glob_patterns to keep cache and task passes manageable; ≥16 GB system RAM is a comfort target for Polars + the trainer. For the full PhysioNet export, prefer fast SSD, large disk, and plenty of RAM — total work scales with the corpus size even if RAM ingest is bounded.

Bundled FHIR datasets#

API reference#

class pyhealth.datasets.FHIRDataset(root, config_path=None, glob_pattern=None, glob_patterns=None, output_format=None, max_patients=None, ingest_num_shards=None, cache_dir=None, num_workers=1, dev=False)[source]#

Bases: BaseDataset

FHIR resources flattened into per-type tables, then the standard pipeline.

Streams raw FHIR NDJSON/NDJSON.GZ exports into flattened tables (one per configured resource type) and pipelines them through BaseDataset for downstream task processing (global event dataframe, patient iteration, task sampling).

The entire ingest is driven by a single YAML config with three top-level sections — glob_patterns: (which NDJSON files to open), resource_specs: (how to project each FHIR resource type into a flat row), and tables: (how those rows are exposed as events downstream). See pyhealth/datasets/fhir/configs/mimic4fhir.yaml for a complete worked example and the FHIRDataset rst page for a section-by-section guide.

Pass config_path=... directly, or subclass and set DEFAULT_CONFIG_PATH to bundle a default (see MIMIC4FHIR).

Parameters:
  • root (str) – Path to the NDJSON/NDJSON.GZ export directory.

  • config_path (Optional[str]) – Path to the FHIR ingest YAML. Defaults to the class attribute DEFAULT_CONFIG_PATH. The YAML must contain a resource_specs: block; any glob_patterns: and tables: blocks are also read from here.

  • glob_pattern (Optional[str]) – Single glob for NDJSON files; overrides the YAML’s glob_patterns. Mutually exclusive with glob_patterns.

  • glob_patterns (Optional[Sequence[str]]) – Multiple glob patterns; overrides the YAML’s glob_patterns. Mutually exclusive with glob_pattern.

  • output_format (Optional[str]) – Flat-table format, one of parquet (default), csv, tsv. Defaults to the class attribute DEFAULT_OUTPUT_FORMAT.

  • max_patients (Optional[int]) – Limit ingest to the first N unique patient IDs.

  • ingest_num_shards (Optional[int]) – Ignored; retained for API compatibility.

  • cache_dir (Union[str, Path, None]) – Cache directory root (UUID subdir appended per config).

  • num_workers (int) – Worker processes for task sampling.

  • dev (bool) – Development mode; limits to 1000 patients if max_patients is None.

Examples

>>> # ad-hoc, no subclass
>>> ds = FHIRDataset(
...     root="/data/fhir",
...     config_path="my_fhir.yaml",
... )
>>> # or a preconfigured source subclass
>>> from pyhealth.datasets import MIMIC4FHIR
>>> ds = MIMIC4FHIR(root="/data/mimic-iv-fhir", max_patients=500)
DEFAULT_CONFIG_PATH: Optional[str] = None#

Default ingest YAML path; set by source subclasses to bundle a config.

DEFAULT_OUTPUT_FORMAT: str = 'parquet'#

Default flat-table output format.

DATASET_NAME: str = 'fhir'#

Dataset name used for cache identity / logging.

property prepared_tables_dir: Path#
Return type:

Path

clean_tmpdir()#

Cleans up the temporary directory within the cache.

Return type:

None

create_tmpdir()#

Creates and returns a new temporary directory within the cache.

Returns:

The path to the new temporary directory.

Return type:

Path

property default_task: Optional[BaseTask]#

Returns the default task for the dataset.

Returns:

The default task, if any.

Return type:

Optional[BaseTask]

get_patient(patient_id)#

Retrieves a Patient object for the given patient ID.

Parameters:

patient_id (str) – The ID of the patient to retrieve.

Returns:

The Patient object for the given ID.

Return type:

Patient

Raises:

AssertionError – If the patient ID is not found in the dataset.

property global_event_df: LazyFrame#

Returns the path to the cached event dataframe.

Returns:

The path to the cached event dataframe.

Return type:

Path

iter_patients(df=None)#

Yields Patient objects for each unique patient in the dataset.

Yields:

Iterator[Patient] – An iterator over Patient objects.

Return type:

Iterator[Patient]

load_data()#

Loads data from the specified tables.

Returns:

A concatenated lazy frame of all tables.

Return type:

dd.DataFrame

set_task(task=None, num_workers=None, input_processors=None, output_processors=None)#

Processes the base dataset to generate the task-specific sample dataset. The cache structure is as follows:

{task_name}_{task_uuid}/        # Cached data for specific task based on task name, schema, and args
    task_df.ld/                 # Intermediate task dataframe based on schema
    samples_{proc_uuid}.ld/     # Final processed samples after applying processors
        schema.pkl              # Saved SampleBuilder schema
        *.bin                   # Processed sample files
Parameters:
  • task (Optional[BaseTask]) – The task to set. Uses default task if None.

  • num_workers (int) – Number of workers for multi-threading. Default is self.num_workers.

  • input_processors (Optional[Dict[str, FeatureProcessor]]) – Pre-fitted input processors. If provided, these will be used instead of creating new ones from task’s input_schema. Defaults to None.

  • output_processors (Optional[Dict[str, FeatureProcessor]]) – Pre-fitted output processors. If provided, these will be used instead of creating new ones from task’s output_schema. Defaults to None.

Returns:

The generated sample dataset.

Return type:

SampleDataset

Raises:

AssertionError – If no default task is found and task is None.

stats()#

Prints statistics about the dataset.

Return type:

None

load_table(table_name)[source]#

Load one flattened table into the standard event schema.

Deviations from BaseDataset.load_table (CSV via _scan_csv_tsv_gz):

  • Reads pre-built flat tables (parquet/csv/tsv) under prepared_tables_dir.

  • Timestamp parsing uses errors="coerce" + utc=True (FHIR ISO strings include timezone suffix or partial dates).

  • Strips tz-aware timestamps to naive UTC for Dask compat.

  • Drops rows with null patient_id before returning.

Return type:

DataFrame

property unique_patient_ids: List[str]#

Returns a list of unique patient IDs.

Returns:

List of unique patient IDs.

Return type:

List[str]