Source code for pyhealth.processors.timeseries_processor

from datetime import datetime, timedelta
from typing import Any, List, Tuple

import numpy as np
import torch

from . import register_processor
from .base_processor import FeatureProcessor


[docs]@register_processor("timeseries") class TimeseriesProcessor(FeatureProcessor): """ Feature processor for irregular time series with missing values. Input: - timestamps: List[datetime] - values: np.ndarray of shape (T, F) Processing: 1. Uniform sampling at fixed intervals. 2. Imputation for missing values. Output: - torch.Tensor of shape (S, F), where S is the number of sampled time steps. """ def __init__( self, sampling_rate: timedelta = timedelta(hours=1), impute_strategy: str = "forward_fill", ): # Configurable sampling rate and imputation method self.sampling_rate = sampling_rate self.impute_strategy = impute_strategy self.n_features = None
[docs] def fit(self, samples: Any, field: str) -> None: """Fit the processor by determining n_features from the first valid sample. Args: samples: Iterable of sample dictionaries. field: The field name to extract from samples. """ # Extract n_features from the first valid sample without full processing for sample in samples: if field in sample and sample[field] is not None: _, values = sample[field] values = np.asarray(values) if values.ndim == 2: self.n_features = values.shape[1] break elif values.ndim == 1: self.n_features = 1 break
[docs] def process(self, value: Tuple[List[datetime], np.ndarray]) -> torch.Tensor: timestamps, values = value if len(timestamps) == 0: raise ValueError("Timestamps list is empty.") values = np.asarray(values) num_features = values.shape[1] # Step 1: Uniform sampling start_time = timestamps[0] end_time = timestamps[-1] total_steps = int((end_time - start_time) / self.sampling_rate) + 1 sampled_times = [ start_time + i * self.sampling_rate for i in range(total_steps) ] sampled_values = np.full((total_steps, num_features), np.nan) # Map original timestamps to indices in the sampled grid for t, v in zip(timestamps, values): idx = int((t - start_time) / self.sampling_rate) if 0 <= idx < total_steps: sampled_values[idx] = v # Step 2: Imputation if self.impute_strategy == "forward_fill": for f in range(num_features): last_value = 0.0 for t in range(total_steps): if not np.isnan(sampled_values[t, f]): last_value = sampled_values[t, f] else: sampled_values[t, f] = last_value elif self.impute_strategy == "zero": sampled_values = np.nan_to_num(sampled_values, nan=0.0) else: raise ValueError(f"Unsupported imputation strategy: {self.impute_strategy}") if self.n_features is None: self.n_features = sampled_values.shape[1] return torch.tensor(sampled_values, dtype=torch.float)
[docs] def size(self): # Size equals number of features, unknown until first process return self.n_features
[docs] def is_token(self) -> bool: """Time series values are continuous, not discrete tokens.""" return False
[docs] def schema(self) -> tuple[str, ...]: return ("value",)
[docs] def dim(self) -> tuple[int, ...]: """Output is a 2D tensor (time_steps, features).""" return (2,)
[docs] def spatial(self) -> tuple[bool, ...]: # Time dimension is spatial; feature dimension is not return (True, False)
def __repr__(self): return ( f"TimeSeriesProcessor(sampling_rate={self.sampling_rate}, " f"impute_strategy='{self.impute_strategy}')" )