pyhealth.datasets.utils#

Several utility functions.

class pyhealth.datasets.utils.datetime(year, month, day[, hour[, minute[, second[, microsecond[, tzinfo]]]]])#

Bases: date

The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.

astimezone()#

tz -> convert to local time in new timezone tz

combine()#

date, time -> datetime with same date and time fields

ctime()#

Return ctime() style string.

date()#

Return date object with same year, month and day.

day#
dst()#

Return self.tzinfo.dst(self).

fold#
fromisocalendar()#

int, int, int -> Construct a date from the ISO year, week number and weekday.

This is the inverse of the date.isocalendar() function

fromisoformat()#

string -> datetime from a string in most ISO 8601 formats

fromordinal()#

int -> date corresponding to a proleptic Gregorian ordinal.

fromtimestamp()#

timestamp[, tz] -> tz’s local time from POSIX timestamp.

hour#
isocalendar()#

Return a named tuple containing ISO year, week number, and weekday.

isoformat()#

[sep] -> string in ISO 8601 format, YYYY-MM-DDT[HH[:MM[:SS[.mmm[uuu]]]]][+HH:MM]. sep is used to separate the year from the time, and defaults to ‘T’. The optional argument timespec specifies the number of additional terms of the time to include. Valid options are ‘auto’, ‘hours’, ‘minutes’, ‘seconds’, ‘milliseconds’ and ‘microseconds’.

isoweekday()#

Return the day of the week represented by the date. Monday == 1 … Sunday == 7

max = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)#
microsecond#
min = datetime.datetime(1, 1, 1, 0, 0)#
minute#
month#
now()#

Returns new datetime object representing current time local to tz.

tz

Timezone object.

If no tz is specified, uses local timezone.

replace()#

Return datetime with new specified fields.

resolution = datetime.timedelta(microseconds=1)#
second#
strftime()#

format -> strftime() style string.

strptime()#

string, format -> new datetime parsed from a string (like time.strptime()).

time()#

Return time object with same time but with tzinfo=None.

timestamp()#

Return POSIX timestamp as float.

timetuple()#

Return time tuple, compatible with time.localtime().

timetz()#

Return time object with same time and tzinfo.

today()#

Current date or datetime: same as self.__class__.fromtimestamp(time.time()).

toordinal()#

Return proleptic Gregorian ordinal. January 1 of year 1 is day 1.

tzinfo#
tzname()#

Return self.tzinfo.tzname(self).

utcfromtimestamp()#

Construct a naive UTC datetime from a POSIX timestamp.

utcnow()#

Return a new datetime representing UTC day and time.

utcoffset()#

Return self.tzinfo.utcoffset(self).

utctimetuple()#

Return UTC time tuple, compatible with time.localtime().

weekday()#

Return the day of the week represented by the date. Monday == 0 … Sunday == 6

year#
class pyhealth.datasets.utils.Any(*args, **kwargs)[source]#

Bases: object

Special type indicating an unconstrained type.

  • Any is compatible with every type.

  • Any assumed to have all methods.

  • All values assumed to be instances of Any.

Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.

pyhealth.datasets.utils.dateutil_parse(timestr, parserinfo=None, **kwargs)#

Parse a string in one of the supported formats, using the parserinfo parameters.

Parameters:
  • timestr – A string containing a date/time stamp.

  • parserinfo – A parserinfo object containing parameters for the parser. If None, the default arguments to the parserinfo constructor are used.

The **kwargs parameter takes the following keyword arguments:

Parameters:
  • default – The default datetime object, if this is a datetime object and not None, elements specified in timestr replace elements in the default object.

  • ignoretz – If set True, time zones in parsed strings are ignored and a naive datetime object is returned.

  • tzinfos

    Additional time zone names / aliases which may be present in the string. This argument maps time zone names (and optionally offsets from those time zones) to time zones. This parameter can be a dictionary with timezone aliases mapping time zone names to time zones or a function taking two parameters (tzname and tzoffset) and returning a time zone.

    The timezones to which the names are mapped can be an integer offset from UTC in seconds or a tzinfo object.

    This parameter is ignored if ignoretz is set.

  • dayfirst – Whether to interpret the first value in an ambiguous 3-integer date (e.g. 01/05/09) as the day (True) or month (False). If yearfirst is set to True, this distinguishes between YDM and YMD. If set to None, this value is retrieved from the current parserinfo object (which itself defaults to False).

  • yearfirst – Whether to interpret the first value in an ambiguous 3-integer date (e.g. 01/05/09) as the year. If True, the first number is taken to be the year, otherwise the last number is taken to be the year. If this is set to None, the value is retrieved from the current parserinfo object (which itself defaults to False).

  • fuzzy – Whether to allow fuzzy parsing, allowing for string like “Today is January 1, 2047 at 8:21:00AM”.

  • fuzzy_with_tokens – If True, fuzzy is automatically set to True, and the parser will return a tuple where the first element is the parsed datetime.datetime datetimestamp and the second element is a tuple containing the portions of the string which were ignored:

Returns:

Returns a datetime.datetime object or, if the fuzzy_with_tokens option is True, returns a tuple, the first element being a datetime.datetime object, the second a tuple containing the fuzzy tokens.

Raises:
  • ParserError – Raised for invalid or unknown string formats, if the provided tzinfo is not in a valid format, or if an invalid date would be created.

  • OverflowError – Raised if the parsed date exceeds the largest valid C integer on your system.

pyhealth.datasets.utils.pad_sequence(sequences, batch_first=False, padding_value=0.0, padding_side='right')[source]#

Pad a list of variable length Tensors with padding_value.

pad_sequence stacks a list of Tensors along a new dimension, and pads them to equal length. sequences can be list of sequences with size L x *, where L is length of the sequence and * is any number of dimensions (including 0). If batch_first is False, the output is of size T x B x *, and B x T x * otherwise, where B is the batch size (the number of elements in sequences), T is the length of the longest sequence.

Example

>>> from torch.nn.utils.rnn import pad_sequence
>>> a = torch.ones(25, 300)
>>> b = torch.ones(22, 300)
>>> c = torch.ones(15, 300)
>>> pad_sequence([a, b, c]).size()
torch.Size([25, 3, 300])

Note

This function returns a Tensor of size T x B x * or B x T x * where T is the length of the longest sequence. This function assumes trailing dimensions and type of all the Tensors in sequences are same.

Parameters:
  • sequences (list[Tensor]) – list of variable length sequences.

  • batch_first (bool, optional) – if True, the output will be in B x T x * format, T x B x * otherwise.

  • padding_value (float, optional) – value for padded elements. Default: 0.

  • padding_side (str, optional) – the side to pad the sequences on. Default: 'right'.

Return type:

Tensor

Returns:

Tensor of size T x B x * if batch_first is False. Tensor of size B x T x * otherwise

class pyhealth.datasets.utils.DataLoader(dataset, batch_size=1, shuffle=None, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None, generator=None, *, prefetch_factor=None, persistent_workers=False, pin_memory_device='', in_order=True)[source]#

Bases: Generic[_T_co]

Data loader combines a dataset and a sampler, and provides an iterable over the given dataset.

The DataLoader supports both map-style and iterable-style datasets with single- or multi-process loading, customizing loading order and optional automatic batching (collation) and memory pinning.

See torch.utils.data documentation page for more details.

Parameters:
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any Iterable with __len__ implemented. If specified, shuffle must not be specified.

  • batch_sampler (Sampler or Iterable, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0)

  • collate_fn (Callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below.

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (Callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • multiprocessing_context (str or multiprocessing.context.BaseContext, optional) – If None, the default multiprocessing context of your operating system will be used. (default: None)

  • generator (torch.Generator, optional) – If not None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of batches loaded in advance by each worker. 2 means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default is None. Otherwise, if value of num_workers > 0 default is 2).

  • persistent_workers (bool, optional) – If True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)

  • pin_memory_device (str, optional) – the device to pin_memory on if pin_memory is True. If not given, the current accelerator will be the default. This argument is discouraged and subject to deprecated.

  • in_order (bool, optional) – If False, the data loader will not enforce that batches are returned in a first-in, first-out order. Only applies when num_workers > 0. (default: True)

Warning

If the spawn start method is used, worker_init_fn cannot be an unpicklable object, e.g., a lambda function. See multiprocessing-best-practices on more details related to multiprocessing in PyTorch.

Warning

len(dataloader) heuristic is based on the length of the sampler used. When dataset is an IterableDataset, it instead returns an estimate based on len(dataset) / batch_size, with proper rounding depending on drop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts user dataset code in correctly handling multi-process loading to avoid duplicate data.

However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when drop_last is set. Unfortunately, PyTorch can not detect such cases in general.

See `Dataset Types`_ for more details on these two types of datasets and how IterableDataset interacts with `Multi-process data loading`_.

Warning

See reproducibility, and dataloader-workers-random-seed, and data-loading-randomness notes for random seed related questions.

Warning

Setting in_order to False can harm reproducibility and may lead to a skewed data distribution being fed to the trainer in cases with imbalanced data.

num_workers: int#
prefetch_factor: Optional[int]#
pin_memory: bool#
pin_memory_device: str#
timeout: float#
dataset: Dataset[_T_co]#
batch_size: Optional[int]#
drop_last: bool#
sampler: Union[Sampler, Iterable]#
property multiprocessing_context#
check_worker_number_rationality()[source]#
pyhealth.datasets.utils.create_directory(directory)[source]#
pyhealth.datasets.utils.hash_str(s)[source]#
pyhealth.datasets.utils.strptime(s)[source]#

Helper function which parses a string to datetime object.

Parameters:

s (str) – str, string to be parsed.

Return type:

Optional[datetime]

Returns:

Optional[datetime], parsed datetime object. If s is nan, return None.

pyhealth.datasets.utils.padyear(year, month='1', day='1')[source]#

Pad a date time year of format ‘YYYY’ to format ‘YYYY-MM-DD’

Parameters:
  • year (str) – str, year to be padded. Must be non-zero value.

  • month – str, month string to be used as padding. Must be in [1, 12]

  • day – str, day string to be used as padding. Must be in [1, 31]

Returns:

str, padded year.

Return type:

padded_date

pyhealth.datasets.utils.flatten_list(l)[source]#

Flattens a list of list.

Parameters:

l (List) – List, the list of list to be flattened.

Return type:

List

Returns:

List, the flattened list.

Examples

>>> flatten_list([[1], [2, 3], [4]])
[1, 2, 3, 4]R
>>> flatten_list([[1], [[2], 3], [4]])
[1, [2], 3, 4]
pyhealth.datasets.utils.list_nested_levels(l)[source]#

Gets all the different nested levels of a list.

Parameters:

l (List) – the list to be checked.

Return type:

Tuple[int]

Returns:

All the different nested levels of the list.

Examples

>>> list_nested_levels([])
(1,)
>>> list_nested_levels([1, 2, 3])
(1,)
>>> list_nested_levels([[]])
(2,)
>>> list_nested_levels([[1, 2, 3], [4, 5, 6]])
(2,)
>>> list_nested_levels([1, [2, 3], 4])
(1, 2)
>>> list_nested_levels([[1, [2, 3], 4]])
(2, 3)
pyhealth.datasets.utils.is_homo_list(l)[source]#

Checks if a list is homogeneous.

Parameters:

l (List) – the list to be checked.

Return type:

bool

Returns:

bool, True if the list is homogeneous, False otherwise.

Examples

>>> is_homo_list([1, 2, 3])
True
>>> is_homo_list([])
True
>>> is_homo_list([1, 2, "3"])
False
>>> is_homo_list([1, 2, 3, [4, 5, 6]])
False
pyhealth.datasets.utils.collate_fn_dict(batch)[source]#

Collates a batch of data into a dictionary of lists.

Parameters:

batch (List[dict]) – List of dictionaries, where each dictionary represents a data sample.

Return type:

dict

Returns:

A dictionary where each key corresponds to a list of values from the batch.

pyhealth.datasets.utils.collate_fn_dict_with_padding(batch)[source]#

Collates a batch of data into a dictionary with padding for tensor values.

Parameters:

batch (List[dict]) – List of dictionaries, where each dictionary represents a data sample.

Return type:

dict

Returns:

A dictionary where each key corresponds to a list of values from the batch. Tensor values are padded to the same shape. Tuples of (time, values) from temporal processors are collated separately.

pyhealth.datasets.utils.get_dataloader(dataset, batch_size, shuffle=False)[source]#

Creates a DataLoader for a given dataset.

Parameters:
  • dataset (StreamingDataset) – The dataset to load data from.

  • batch_size (int) – The number of samples per batch.

  • shuffle (bool) – Whether to shuffle the data at every epoch.

Return type:

DataLoader

Returns:

A DataLoader instance for the dataset.

pyhealth.datasets.utils.save_processors(sample_dataset, output_dir)[source]#

Save input and output processors to pickle files.

This function saves the fitted processors from a SampleDataset to disk, allowing them to be reused in future runs for consistent feature encoding.

Parameters:
  • sample_dataset – SampleDataset with fitted processors

  • output_dir (str) – Directory to save processor files

Returns:

Paths where processors were saved with keys

’input_processors’ and ‘output_processors’

Return type:

Dict[str, str]

Example

>>> from pyhealth.datasets import save_processors
>>> sample_dataset = base_dataset.set_task(task)
>>> paths = save_processors(sample_dataset, "./output/processors")
>>> print(paths["input_processors"])
./output/processors/input_processors.pkl
pyhealth.datasets.utils.load_processors(processor_dir)[source]#

Load input and output processors from pickle files.

This function loads previously saved processors from disk, allowing consistent feature encoding across different runs without refitting.

Parameters:

processor_dir (str) – Directory containing processor pickle files

Returns:

(input_processors, output_processors)

Return type:

Tuple[Dict, Dict]

Raises:

FileNotFoundError – If processor files are not found

Example

>>> from pyhealth.datasets import load_processors
>>> input_procs, output_procs = load_processors("./output/processors")
>>> sample_dataset = base_dataset.set_task(
...     task,
...     input_processors=input_procs,
...     output_processors=output_procs
... )